Schedule Class
Defines a schedule on which to submit a pipeline.
Once a Pipeline is published, a Schedule can be used to submit the Pipeline at a specified interval or when changes to a Blob storage location are detected.
Initialize Schedule.
- Inheritance
-
builtins.objectSchedule
Constructor
Schedule(workspace, id, name, description, pipeline_id, status, recurrence, datastore_name, polling_interval, data_path_parameter_name, continue_on_step_failure, path_on_datastore, _schedule_provider=None, pipeline_endpoint_id=None)
Parameters
Name | Description |
---|---|
workspace
Required
|
The workspace object this Schedule will belong to. |
id
Required
|
The ID of the Schedule. |
name
Required
|
The name of the Schedule. |
description
Required
|
The description of the schedule. |
pipeline_id
Required
|
The ID of the pipeline the schedule will submit. |
status
Required
|
The status of the schedule, either 'Active' or 'Disabled'. |
recurrence
Required
|
The schedule recurrence for the pipeline. |
datastore_name
Required
|
The name of the datastore to monitor for modified/added blobs. Note: 1) VNET Datastores are not supported. 2) The authentication type for the datastore should be set to "Account key." |
polling_interval
Required
|
How long, in minutes, between polling for modified/added blobs. |
data_path_parameter_name
Required
|
The name of the data path pipeline parameter to set with the changed blob path. |
continue_on_step_failure
Required
|
Whether to continue execution of other steps in the submitted PipelineRun if a step fails. If provided, this will override the continue_on_step_failure setting for the Pipeline. |
path_on_datastore
Required
|
Optional. The path on the datastore to monitor for modified/added blobs. Note: the path_on_datastore will be under the container for the datastore, so the actual path the schedule will monitor will be container/path_on_datastore. If none, the datastore container is monitored. Additions/modifications made in a subfolder of the path_on_datastore are not monitored. Only supported for DataStore schedules. |
_schedule_provider
|
<xref:azureml.pipeline.core._aeva_provider._AevaScheduleProvider>
The schedule provider. Default value: None
|
workspace
Required
|
The workspace object this Schedule will belong to. |
id
Required
|
The ID of the Schedule. |
name
Required
|
The name of the Schedule. |
description
Required
|
The description of the schedule. |
pipeline_id
Required
|
The ID of the pipeline the schedule will submit. |
status
Required
|
The status of the schedule, either 'Active' or 'Disabled'. |
recurrence
Required
|
The schedule recurrence of the pipeline. |
datastore_name
Required
|
The name of the datastore to monitor for modified/added blobs. Note: VNET Datastores are not supported. |
polling_interval
Required
|
How long, in minutes, between polling for modified/added blobs. |
data_path_parameter_name
Required
|
The name of the data path pipeline parameter to set with the changed blob path. |
continue_on_step_failure
Required
|
Whether to continue execution of other steps in the submitted PipelineRun if a step fails. If provided, this will override the continue_on_step_failure setting for the Pipeline. |
path_on_datastore
Required
|
Optional. The path on the datastore to monitor for modified/added blobs. Note: the path_on_datastore will be under the container for the datastore, so the actual path the schedule will monitor will be container/path_on_datastore. If none, the datastore container is monitored. Additions/modifications made in a subfolder of the path_on_datastore are not monitored. Only supported for DataStore schedules. |
_schedule_provider
Required
|
<xref:azureml.pipeline.core._aeva_provider._AevaScheduleProvider>
The schedule provider. |
pipeline_endpoint_id
|
The ID of the pipeline endpoint the schedule will submit. Default value: None
|
Remarks
Two types of schedules are supported. The first uses time recurrence to submit a Pipeline on a given schedule. The second monitors an AzureBlobDatastore for added or modified blobs and submits a Pipeline when changes are detected.
To create a Schedule which will submit a Pipeline on a recurring schedule, use the ScheduleRecurrence when creating the Schedule.
A ScheduleRecurrence is used when creating a Schedule for a Pipeline as follows:
from azureml.pipeline.core import Schedule, ScheduleRecurrence
recurrence = ScheduleRecurrence(frequency="Hour", interval=12)
schedule = Schedule.create(workspace, name="TestSchedule", pipeline_id="pipeline_id",
experiment_name="helloworld", recurrence=recurrence)
This Schedule will submit the provided PublishedPipeline every 12 hours. The submitted Pipeline will be created under the Experiment with the name "helloworld".
To create a Schedule which will trigger PipelineRuns on modifications to a Blob storage location, specify a Datastore and related data info when creating the Schedule.
from azureml.pipeline.core import Schedule
from azureml.core.datastore import Datastore
datastore = Datastore(workspace=ws, name="workspaceblobstore")
schedule = Schedule.create(workspace, name="TestSchedule", pipeline_id="pipeline_id"
experiment_name="helloworld", datastore=datastore,
polling_interval=5, path_on_datastore="file/path")
Note that the polling_interval and path_on_datastore parameters are optional. The polling_interval specifies how often to poll for modifications to the Datastore, and by default is 5 minutes. path_on_datastore can be used to specify which folder on the Datastore to monitor for changes. If None, the Datastore container is monitored. Note: blob additions/modifications in sub-folders of the path_on_datastore or the Datastore container (if no path_on_datastore is specified) are not detected.
Additionally, if the Pipeline was constructed to use a DataPath PipelineParameter to describe a step input, use the data_path_parameter_name parameter when creating a Datastore-triggered Schedule to set the input to the changed file when a PipelineRun is submitted by the Schedule.
In the following example, when the Schedule triggers the PipelineRun, the value of the "input_data" PipelineParameter will be set as the file which was modified/added:
from azureml.pipeline.core import Schedule
from azureml.core.datastore import Datastore
datastore = Datastore(workspace=ws, name="workspaceblobstore")
schedule = Schedule.create(workspace, name="TestSchedule", pipeline_id="pipeline_id",
experiment_name="helloworld", datastore=datastore,
data_path_parameter_name="input_data")
For more information on Schedules, see: https://aka.ms/pl-schedule.
Methods
create |
Create a schedule for a pipeline. Specify recurrence for a time-based schedule or specify a Datastore, (optional) polling_interval, and (optional) data_path_parameter_name to create a schedule which will monitor the Datastore location for modifications/additions. |
create_for_pipeline_endpoint |
Create a schedule for a pipeline endpoint. Specify recurrence for a time-based schedule or specify a Datastore, (optional) polling_interval, and (optional) data_path_parameter_name to create a schedule which will monitor the Datastore location for modifications/additions. |
disable |
Set the schedule to 'Disabled' and unavailable to run. |
enable |
Set the schedule to 'Active' and available to run. |
get |
Get the schedule with the given ID. |
get_all |
Get all schedules in the current workspace. DEPRECATED: This method is being deprecated in favor of the list method. |
get_last_pipeline_run |
Fetch the last pipeline run submitted by the schedule. Returns None if no runs have been submitted. |
get_pipeline_runs |
Fetch the pipeline runs that were generated from the schedule. |
get_schedules_for_pipeline_endpoint_id |
Get all schedules for the given pipeline endpoint id. |
get_schedules_for_pipeline_id |
Get all schedules for the given pipeline id. |
list |
Get all schedules in the current workspace. |
load_yaml |
Load and read the YAML file to get schedule parameters. YAML file is one more way to pass Schedule parameters to create schedule. |
update |
Update the schedule. |
create
Create a schedule for a pipeline.
Specify recurrence for a time-based schedule or specify a Datastore, (optional) polling_interval, and (optional) data_path_parameter_name to create a schedule which will monitor the Datastore location for modifications/additions.
static create(workspace, name, pipeline_id, experiment_name, recurrence=None, description=None, pipeline_parameters=None, wait_for_provisioning=False, wait_timeout=3600, datastore=None, polling_interval=5, data_path_parameter_name=None, continue_on_step_failure=None, path_on_datastore=None, _workflow_provider=None, _service_endpoint=None)
Parameters
Name | Description |
---|---|
workspace
Required
|
The workspace object this Schedule will belong to. |
name
Required
|
The name of the Schedule. |
pipeline_id
Required
|
The ID of the pipeline the schedule will submit. |
experiment_name
Required
|
The name of the experiment the schedule will submit runs on. |
recurrence
|
The schedule recurrence of the pipeline. Default value: None
|
description
|
The description of the schedule. Default value: None
|
pipeline_parameters
|
A dictionary of parameters to assign new values {param name, param value} Default value: None
|
wait_for_provisioning
|
Whether to wait for provisioning of the schedule to complete. Default value: False
|
wait_timeout
|
The number of seconds to wait before timing out. Default value: 3600
|
datastore
|
The Datastore to monitor for modified/added blobs. Note: VNET Datastores are not supported. Can not use with a Recurrence. Default value: None
|
polling_interval
|
How long, in minutes, between polling for modified/added blobs. Default is 5 minutes. Only supported for DataStore schedules. Default value: 5
|
data_path_parameter_name
|
The name of the data path pipeline parameter to set with the changed blob path. Only supported for DataStore schedules. Default value: None
|
continue_on_step_failure
|
Whether to continue execution of other steps in the submitted PipelineRun if a step fails. If provided, this will override the continue_on_step_failure setting for the Pipeline. Default value: None
|
path_on_datastore
|
Optional. The path on the datastore to monitor for modified/added blobs. Note: the path_on_datastore will be under the container for the datastore, so the actual path the schedule will monitor will be container/path_on_datastore. If none, the datastore container is monitored. Additions/modifications made in a subfolder of the path_on_datastore are not monitored. Only supported for DataStore schedules. Default value: None
|
_workflow_provider
|
<xref:azureml.pipeline.core._aeva_provider._AevaWorkflowProvider>
The workflow provider. Default value: None
|
_service_endpoint
|
The service endpoint. Default value: None
|
Returns
Type | Description |
---|---|
The created schedule. |
create_for_pipeline_endpoint
Create a schedule for a pipeline endpoint.
Specify recurrence for a time-based schedule or specify a Datastore, (optional) polling_interval, and (optional) data_path_parameter_name to create a schedule which will monitor the Datastore location for modifications/additions.
static create_for_pipeline_endpoint(workspace, name, pipeline_endpoint_id, experiment_name, recurrence=None, description=None, pipeline_parameters=None, wait_for_provisioning=False, wait_timeout=3600, datastore=None, polling_interval=5, data_path_parameter_name=None, continue_on_step_failure=None, path_on_datastore=None, _workflow_provider=None, _service_endpoint=None)
Parameters
Name | Description |
---|---|
workspace
Required
|
The workspace object this Schedule will belong to. |
name
Required
|
The name of the Schedule. |
pipeline_endpoint_id
Required
|
The ID of the pipeline endpoint the schedule will submit. |
experiment_name
Required
|
The name of the experiment the schedule will submit runs on. |
recurrence
|
The schedule recurrence of the pipeline. Default value: None
|
description
|
The description of the schedule. Default value: None
|
pipeline_parameters
|
A dictionary of parameters to assign new values {param name, param value} Default value: None
|
wait_for_provisioning
|
Whether to wait for provisioning of the schedule to complete. Default value: False
|
wait_timeout
|
The number of seconds to wait before timing out. Default value: 3600
|
datastore
|
The Datastore to monitor for modified/added blobs. Note: VNET Datastores are not supported. Can not use with a Recurrence. Default value: None
|
polling_interval
|
How long, in minutes, between polling for modified/added blobs. Default is 5 minutes. Only supported for DataStore schedules. Default value: 5
|
data_path_parameter_name
|
The name of the data path pipeline parameter to set with the changed blob path. Only supported for DataStore schedules. Default value: None
|
continue_on_step_failure
|
Whether to continue execution of other steps in the submitted PipelineRun if a step fails. If provided, this will override the continue_on_step_failure setting for the Pipeline. Default value: None
|
path_on_datastore
|
Optional. The path on the datastore to monitor for modified/added blobs. Note: the path_on_datastore will be under the container for the datastore, so the actual path the schedule will monitor will be container/path_on_datastore. If none, the datastore container is monitored. Additions/modifications made in a subfolder of the path_on_datastore are not monitored. Only supported for DataStore schedules. Default value: None
|
_workflow_provider
|
<xref:azureml.pipeline.core._aeva_provider._AevaWorkflowProvider>
The workflow provider. Default value: None
|
_service_endpoint
|
The service endpoint. Default value: None
|
Returns
Type | Description |
---|---|
The created schedule. |
disable
Set the schedule to 'Disabled' and unavailable to run.
disable(wait_for_provisioning=False, wait_timeout=3600)
Parameters
Name | Description |
---|---|
wait_for_provisioning
|
Whether to wait for provisioning of the schedule to complete. Default value: False
|
wait_timeout
|
Number of seconds to wait before timing out. Default value: 3600
|
enable
Set the schedule to 'Active' and available to run.
enable(wait_for_provisioning=False, wait_timeout=3600)
Parameters
Name | Description |
---|---|
wait_for_provisioning
|
Whether to wait for provisioning of the schedule to complete. Default value: False
|
wait_timeout
|
Number of seconds to wait before timing out. Default value: 3600
|
get
Get the schedule with the given ID.
static get(workspace, id, _workflow_provider=None, _service_endpoint=None)
Parameters
Name | Description |
---|---|
workspace
Required
|
The workspace the schedule was created on. |
id
Required
|
ID of the schedule. |
_workflow_provider
|
<xref:azureml.pipeline.core._aeva_provider._AevaWorkflowProvider>
The workflow provider. Default value: None
|
_service_endpoint
|
The service endpoint. Default value: None
|
Returns
Type | Description |
---|---|
Schedule object |
get_all
Get all schedules in the current workspace.
DEPRECATED: This method is being deprecated in favor of the list method.
static get_all(workspace, active_only=True, pipeline_id=None, pipeline_endpoint_id=None, _workflow_provider=None, _service_endpoint=None)
Parameters
Name | Description |
---|---|
workspace
Required
|
The workspace. |
active_only
|
If true, only return schedules which are currently active. Only applies if no pipeline id is provided. Default value: True
|
pipeline_id
|
If provided, only return schedules for the pipeline with the given id. Default value: None
|
pipeline_endpoint_id
|
If provided, only return schedules for the pipeline endpoint with the given id. Default value: None
|
_workflow_provider
|
<xref:azureml.pipeline.core._aeva_provider._AevaWorkflowProvider>
The workflow provider. Default value: None
|
_service_endpoint
|
The service endpoint. Default value: None
|
Returns
Type | Description |
---|---|
A list of Schedule. |
get_last_pipeline_run
Fetch the last pipeline run submitted by the schedule. Returns None if no runs have been submitted.
get_last_pipeline_run()
Returns
Type | Description |
---|---|
The last pipeline run. |
get_pipeline_runs
Fetch the pipeline runs that were generated from the schedule.
get_pipeline_runs()
Returns
Type | Description |
---|---|
A list of PipelineRun. |
get_schedules_for_pipeline_endpoint_id
Get all schedules for the given pipeline endpoint id.
static get_schedules_for_pipeline_endpoint_id(workspace, pipeline_endpoint_id, _workflow_provider=None, _service_endpoint=None)
Parameters
Name | Description |
---|---|
workspace
Required
|
The workspace. |
pipeline_endpoint_id
Required
|
The pipeline endpoint id. |
_workflow_provider
|
<xref:azureml.pipeline.core._aeva_provider._AevaWorkflowProvider>
The workflow provider. Default value: None
|
_service_endpoint
|
The service endpoint. Default value: None
|
Returns
Type | Description |
---|---|
A list of Schedule. |
get_schedules_for_pipeline_id
Get all schedules for the given pipeline id.
static get_schedules_for_pipeline_id(workspace, pipeline_id, _workflow_provider=None, _service_endpoint=None)
Parameters
Name | Description |
---|---|
workspace
Required
|
The workspace. |
pipeline_id
Required
|
The pipeline id. |
_workflow_provider
|
<xref:azureml.pipeline.core._aeva_provider._AevaWorkflowProvider>
The workflow provider. Default value: None
|
_service_endpoint
|
The service endpoint. Default value: None
|
Returns
Type | Description |
---|---|
A list of Schedule. |
list
Get all schedules in the current workspace.
static list(workspace, active_only=True, pipeline_id=None, pipeline_endpoint_id=None, _workflow_provider=None, _service_endpoint=None)
Parameters
Name | Description |
---|---|
workspace
Required
|
The workspace. |
active_only
|
If true, only return schedules which are currently active. Only applies if no pipeline id is provided. Default value: True
|
pipeline_id
|
If provided, only return schedules for the pipeline with the given id. Default value: None
|
pipeline_endpoint_id
|
If provided, only return schedules for the pipeline endpoint with the given id. Default value: None
|
_workflow_provider
|
<xref:azureml.pipeline.core._aeva_provider._AevaWorkflowProvider>
The workflow provider. Default value: None
|
_service_endpoint
|
The service endpoint. Default value: None
|
Returns
Type | Description |
---|---|
A list of Schedule. |
load_yaml
Load and read the YAML file to get schedule parameters.
YAML file is one more way to pass Schedule parameters to create schedule.
static load_yaml(workspace, filename, _workflow_provider=None, _service_endpoint=None)
Parameters
Name | Description |
---|---|
workspace
Required
|
The workspace. |
filename
Required
|
The YAML filename with location. |
_workflow_provider
|
<xref:azureml.pipeline.core._aeva_provider._AevaWorkflowProvider>
The workflow provider. Default value: None
|
_service_endpoint
|
The service endpoint. Default value: None
|
Returns
Type | Description |
---|---|
A dictionary of Schedule parameters and values. |
Remarks
Two types of YAML are supported for Schedules. The first reads and loads recurrence info for schedule create to trigger pipeline. The second reads and loads datastore info for schedule create to trigger pipeline.
Example to create a Schedule which will submit a Pipeline on a recurrence, as follows:
from azureml.pipeline.core import Schedule
schedule_info = Schedule.load_yaml(workspace=workspace,
filename='./yaml/test_schedule_with_recurrence.yaml')
schedule = Schedule.create(workspace, name="TestSchedule", pipeline_id="pipeline_id",
experiment_name="helloworld", recurrence=schedule_info.get("recurrence"),
description=schedule_info.get("description"))
Sample YAML file test_schedule_with_recurrence.yaml:
Schedule:
description: "Test create with recurrence"
recurrence:
frequency: Week # Can be "Minute", "Hour", "Day", "Week", or "Month".
interval: 1 # how often fires
start_time: 2019-06-07T10:50:00
time_zone: UTC
hours:
- 1
minutes:
- 0
time_of_day: null
week_days:
- Friday
pipeline_parameters: {'a':1}
wait_for_provisioning: True
wait_timeout: 3600
datastore_name: ~
polling_interval: ~
data_path_parameter_name: ~
continue_on_step_failure: None
path_on_datastore: ~
Example to create a Schedule which will submit a Pipeline on a datastore, as follows:
from azureml.pipeline.core import Schedule
schedule_info = Schedule.load_yaml(workspace=workspace,
filename='./yaml/test_schedule_with_datastore.yaml')
schedule = Schedule.create(workspace, name="TestSchedule", pipeline_id="pipeline_id",
experiment_name="helloworld",datastore=schedule_info.get("datastore_name"),
polling_interval=schedule_info.get("polling_interval"),
data_path_parameter_name=schedule_info.get("data_path_parameter_name"),
continue_on_step_failure=schedule_info.get("continue_on_step_failure"),
path_on_datastore=schedule_info.get("path_on_datastore"))
update
Update the schedule.
update(name=None, description=None, recurrence=None, pipeline_parameters=None, status=None, wait_for_provisioning=False, wait_timeout=3600, datastore=None, polling_interval=None, data_path_parameter_name=None, continue_on_step_failure=None, path_on_datastore=None)
Parameters
Name | Description |
---|---|
name
|
The new name of the Schedule. Default value: None
|
recurrence
|
The new schedule recurrence of the pipeline. Default value: None
|
description
|
The new description of the schedule. Default value: None
|
pipeline_parameters
|
A dictionary of parameters to assign new values {param name, param value}. Default value: None
|
status
|
The new status of the schedule: 'Active' or 'Disabled'. Default value: None
|
wait_for_provisioning
|
Whether to wait for provisioning of the schedule to complete. Default value: False
|
wait_timeout
|
The number of seconds to wait before timing out. Default value: 3600
|
datastore
|
The Datastore to monitor for modified/added blobs. Note: VNET Datastores are not supported. Default value: None
|
polling_interval
|
How long, in minutes, between polling for modified/added blobs. Default is 5 minutes. Default value: None
|
data_path_parameter_name
|
The name of the data path pipeline parameter to set with the changed blob path. Default value: None
|
continue_on_step_failure
|
Whether to continue execution of other steps in the submitted PipelineRun if a step fails. If provided, this will override the continue_on_step_failure setting for the Pipeline. Default value: None
|
path_on_datastore
|
Optional. The path on the datastore to monitor for modified/added blobs. Note: the path_on_datastore will be under the container for the datastore, so the actual path the schedule will monitor will be container/path_on_datastore. If none, the datastore container is monitored. Additions/modifications made in a subfolder of the path_on_datastore are not monitored. Only supported for DataStore schedules. Default value: None
|
Attributes
continue_on_step_failure
Get the value of the continue_on_step_failure
setting.
Returns
Type | Description |
---|---|
The value of the |
data_path_parameter_name
Get the name of the data path pipeline parameter to set with the changed blob path.
Returns
Type | Description |
---|---|
The data path parameter name. |
datastore_name
Get the name of the Datastore used for the schedule.
Returns
Type | Description |
---|---|
The Datastore name. |
description
id
name
path_on_datastore
Get the path on the datastore that the schedule monitors.
Returns
Type | Description |
---|---|
The path on datastore. |
pipeline_endpoint_id
pipeline_id
polling_interval
Get how long, in minutes, between polling for modified/added blobs.
Returns
Type | Description |
---|---|
The polling interval. |