Trigger machine learning pipelines
APPLIES TO: Python SDK azureml v1
In this article, you'll learn how to programmatically schedule a pipeline to run on Azure. You can create a schedule based on elapsed time or on file-system changes. Time-based schedules can be used to take care of routine tasks, such as monitoring for data drift. Change-based schedules can be used to react to irregular or unpredictable changes, such as new data being uploaded or old data being edited. After learning how to create schedules, you'll learn how to retrieve and deactivate them. Finally, you'll learn how to use other Azure services, Azure Logic App and Azure Data Factory, to run pipelines. An Azure Logic App allows for more complex triggering logic or behavior. Azure Data Factory pipelines allow you to call a machine learning pipeline as part of a larger data orchestration pipeline.
Prerequisites
An Azure subscription. If you don’t have an Azure subscription, create a free account.
A Python environment in which the Azure Machine Learning SDK for Python is installed. For more information, see Create and manage reusable environments for training and deployment with Azure Machine Learning.
A Machine Learning workspace with a published pipeline. You can use the one built in Create and run machine learning pipelines with Azure Machine Learning SDK.
Trigger pipelines with Azure Machine Learning SDK for Python
To schedule a pipeline, you'll need a reference to your workspace, the identifier of your published pipeline, and the name of the experiment in which you wish to create the schedule. You can get these values with the following code:
import azureml.core
from azureml.core import Workspace
from azureml.pipeline.core import Pipeline, PublishedPipeline
from azureml.core.experiment import Experiment
ws = Workspace.from_config()
experiments = Experiment.list(ws)
for experiment in experiments:
print(experiment.name)
published_pipelines = PublishedPipeline.list(ws)
for published_pipeline in published_pipelines:
print(f"{published_pipeline.name},'{published_pipeline.id}'")
experiment_name = "MyExperiment"
pipeline_id = "aaaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee"
Create a schedule
To run a pipeline on a recurring basis, you'll create a schedule. A Schedule
associates a pipeline, an experiment, and a trigger. The trigger can either be aScheduleRecurrence
that describes the wait between jobs or a Datastore path that specifies a directory to watch for changes. In either case, you'll need the pipeline identifier and the name of the experiment in which to create the schedule.
At the top of your Python file, import the Schedule
and ScheduleRecurrence
classes:
from azureml.pipeline.core.schedule import ScheduleRecurrence, Schedule
Create a time-based schedule
The ScheduleRecurrence
constructor has a required frequency
argument that must be one of the following strings: "Minute", "Hour", "Day", "Week", or "Month". It also requires an integer interval
argument specifying how many of the frequency
units should elapse between schedule starts. Optional arguments allow you to be more specific about starting times, as detailed in the ScheduleRecurrence SDK docs.
Create a Schedule
that begins a job every 15 minutes:
recurrence = ScheduleRecurrence(frequency="Minute", interval=15)
recurring_schedule = Schedule.create(ws, name="MyRecurringSchedule",
description="Based on time",
pipeline_id=pipeline_id,
experiment_name=experiment_name,
recurrence=recurrence)
Create a change-based schedule
Pipelines that are triggered by file changes may be more efficient than time-based schedules. When you want to do something before a file is changed, or when a new file is added to a data directory, you can preprocess that file. You can monitor any changes to a datastore or changes within a specific directory within the datastore. If you monitor a specific directory, changes within subdirectories of that directory will not trigger a job.
Note
Change-based schedules only supports monitoring Azure Blob storage.
To create a file-reactive Schedule
, you must set the datastore
parameter in the call to Schedule.create. To monitor a folder, set the path_on_datastore
argument.
The polling_interval
argument allows you to specify, in minutes, the frequency at which the datastore is checked for changes.
If the pipeline was constructed with a DataPath PipelineParameter, you can set that variable to the name of the changed file by setting the data_path_parameter_name
argument.
datastore = Datastore(workspace=ws, name="workspaceblobstore")
reactive_schedule = Schedule.create(ws, name="MyReactiveSchedule", description="Based on input file change.",
pipeline_id=pipeline_id, experiment_name=experiment_name, datastore=datastore, data_path_parameter_name="input_data")
Optional arguments when creating a schedule
In addition to the arguments discussed previously, you may set the status
argument to "Disabled"
to create an inactive schedule. Finally, the continue_on_step_failure
allows you to pass a Boolean that will override the pipeline's default failure behavior.
View your scheduled pipelines
In your Web browser, navigate to Azure Machine Learning. From the Endpoints section of the navigation panel, choose Pipeline endpoints. This takes you to a list of the pipelines published in the Workspace.
In this page you can see summary information about all the pipelines in the Workspace: names, descriptions, status, and so forth. Drill in by clicking in your pipeline. On the resulting page, there are more details about your pipeline and you may drill down into individual jobs.
Deactivate the pipeline
If you have a Pipeline
that is published, but not scheduled, you can disable it with:
pipeline = PublishedPipeline.get(ws, id=pipeline_id)
pipeline.disable()
If the pipeline is scheduled, you must cancel the schedule first. Retrieve the schedule's identifier from the portal or by running:
ss = Schedule.list(ws)
for s in ss:
print(s)
Once you have the schedule_id
you wish to disable, run:
def stop_by_schedule_id(ws, schedule_id):
s = next(s for s in Schedule.list(ws) if s.id == schedule_id)
s.disable()
return s
stop_by_schedule_id(ws, schedule_id)
If you then run Schedule.list(ws)
again, you should get an empty list.
Use Azure Logic Apps for complex triggers
More complex trigger rules or behavior can be created using an Azure Logic App.
To use an Azure Logic App to trigger a Machine Learning pipeline, you'll need the REST endpoint for a published Machine Learning pipeline. Create and publish your pipeline. Then find the REST endpoint of your PublishedPipeline
by using the pipeline ID:
# You can find the pipeline ID in Azure Machine Learning studio
published_pipeline = PublishedPipeline.get(ws, id="<pipeline-id-here>")
published_pipeline.endpoint
Create a logic app in Azure
Now create an Azure Logic App instance. After your logic app is provisioned, use these steps to configure a trigger for your pipeline:
Create a system-assigned managed identity to give the app access to your Azure Machine Learning Workspace.
Navigate to the Logic App Designer view and select the Blank Logic App template.
In the Designer, search for blob. Select the When a blob is added or modified (properties only) trigger and add this trigger to your Logic App.
Fill in the connection info for the Blob storage account you wish to monitor for blob additions or modifications. Select the Container to monitor.
Choose the Interval and Frequency to poll for updates that work for you.
Note
This trigger will monitor the selected Container but won't monitor subfolders.
Add an HTTP action that will run when a new or modified blob is detected. Select + New Step, then search for and select the HTTP action.
Use the following settings to configure your action:
Setting | Value |
---|---|
HTTP action | POST |
URI | the endpoint to the published pipeline that you found as a Prerequisite |
Authentication mode | Managed Identity |
Set up your schedule to set the value of any DataPath PipelineParameters you might have:
{ "DataPathAssignments": { "input_datapath": { "DataStoreName": "<datastore-name>", "RelativePath": "@{triggerBody()?['Name']}" } }, "ExperimentName": "MyRestPipeline", "ParameterAssignments": { "input_string": "sample_string3" }, "RunSource": "SDK" }
Use the
DataStoreName
you added to your workspace as a Prerequisite.Select Save and your schedule is now ready.
Important
If you are using Azure role-based access control (Azure RBAC) to manage access to your pipeline, set the permissions for your pipeline scenario (training or scoring).
Call machine learning pipelines from Azure Data Factory pipelines
In an Azure Data Factory pipeline, the Machine Learning Execute Pipeline activity runs an Azure Machine Learning pipeline. You can find this activity in the Data Factory's authoring page under the Machine Learning category:
Next steps
In this article, you used the Azure Machine Learning SDK for Python to schedule a pipeline in two different ways. One schedule recurs based on elapsed clock time. The other schedule jobs if a file is modified on a specified Datastore
or within a directory on that store. You saw how to use the portal to examine the pipeline and individual jobs. You learned how to disable a schedule so that the pipeline stops running. Finally, you created an Azure Logic App to trigger a pipeline.
For more information, see:
- Learn more about pipelines
- Learn more about exploring Azure Machine Learning with Jupyter