Een bestaande pijplijn uitvoeren met Workflow Orchestration Manager
VAN TOEPASSING OP: Azure Data Factory Azure Synapse Analytics
Tip
Probeer Data Factory uit in Microsoft Fabric, een alles-in-één analyseoplossing voor ondernemingen. Microsoft Fabric omvat alles, van gegevensverplaatsing tot gegevenswetenschap, realtime analyses, business intelligence en rapportage. Meer informatie over het gratis starten van een nieuwe proefversie .
Notitie
Workflow Orchestration Manager wordt mogelijk gemaakt door Apache Airflow.
Notitie
Werkstroomindelingsbeheer voor Azure Data Factory is afhankelijk van de open source Apache Airflow-toepassing. Documentatie en meer zelfstudies voor Airflow vindt u op de Apache Airflow-documentatie of communitypagina's.
Data Factory-pijplijnen bieden meer dan 100 connectors voor gegevensbronnen die schaalbare en betrouwbare gegevensintegratie/gegevensstromen bieden. Er zijn scenario's waarin u een bestaande data factory-pijplijn wilt uitvoeren vanuit uw Apache Airflow DAG. In deze zelfstudie leert u hoe u dat doet.
Vereisten
- Azure-abonnement. Als u nog geen abonnement op Azure hebt, maakt u een gratis Azure-account voordat u begint.
- Azure-opslagaccount. Als u geen opslagaccount hebt, raadpleegt u het artikel Een opslagaccount maken om een account te maken. Zorg ervoor dat het opslagaccount alleen toegang toestaat vanuit geselecteerde netwerken.
- Azure Data Factory-pijplijn. U kunt een van de zelfstudies volgen en een nieuwe data factory-pijplijn maken voor het geval u er nog geen hebt, of een pijplijn maken met één selectie in Aan de slag en uw eerste data factory-pijplijn uitproberen.
- Een service-principal instellen. U moet een nieuwe service-principal maken of een bestaande service-principal gebruiken en deze toestemming geven om de pijplijn uit te voeren (bijvoorbeeld de rol van inzender in de data factory waar de bestaande pijplijnen bestaan), zelfs als de werkstroomindelingsbeheeromgeving en de pijplijnen in dezelfde data factory bestaan. U moet de client-id en het clientgeheim (API-sleutel) van de service-principal ophalen.
Stappen
Maak een nieuw Python-bestand adf.py met de onderstaande inhoud:
from datetime import datetime, timedelta from airflow.models import DAG, BaseOperator try: from airflow.operators.empty import EmptyOperator except ModuleNotFoundError: from airflow.operators.dummy import DummyOperator as EmptyOperator # type: ignore from airflow.providers.microsoft.azure.operators.data_factory import AzureDataFactoryRunPipelineOperator from airflow.providers.microsoft.azure.sensors.data_factory import AzureDataFactoryPipelineRunStatusSensor from airflow.utils.edgemodifier import Label with DAG( dag_id="example_adf_run_pipeline", start_date=datetime(2022, 5, 14), schedule_interval="@daily", catchup=False, default_args={ "retries": 1, "retry_delay": timedelta(minutes=3), "azure_data_factory_conn_id": "<connection_id>", #This is a connection created on Airflow UI "factory_name": "<FactoryName>", # This can also be specified in the ADF connection. "resource_group_name": "<ResourceGroupName>", # This can also be specified in the ADF connection. }, default_view="graph", ) as dag: begin = EmptyOperator(task_id="begin") end = EmptyOperator(task_id="end") # [START howto_operator_adf_run_pipeline] run_pipeline1: BaseOperator = AzureDataFactoryRunPipelineOperator( task_id="run_pipeline1", pipeline_name="<PipelineName>", parameters={"myParam": "value"}, ) # [END howto_operator_adf_run_pipeline] # [START howto_operator_adf_run_pipeline_async] run_pipeline2: BaseOperator = AzureDataFactoryRunPipelineOperator( task_id="run_pipeline2", pipeline_name="<PipelineName>", wait_for_termination=False, ) pipeline_run_sensor: BaseOperator = AzureDataFactoryPipelineRunStatusSensor( task_id="pipeline_run_sensor", run_id=run_pipeline2.output["run_id"], ) # [END howto_operator_adf_run_pipeline_async] begin >> Label("No async wait") >> run_pipeline1 begin >> Label("Do async wait with sensor") >> run_pipeline2 [run_pipeline1, pipeline_run_sensor] >> end # Task dependency created via `XComArgs`: # run_pipeline2 >> pipeline_run_sensor
U moet de verbinding maken met de ui-beheerder van werkstroomindelingsbeheer - Verbindingen ->> '+' -> Kies 'Verbindingstype' als 'Azure Data Factory' en vul vervolgens uw client_id, client_secret, tenant_id, subscription_id, resource_group_name, data_factory_name en pipeline_name in.
Upload het adf.py bestand naar uw blobopslag in een map met de naam DAGS.
Importeer de DAGS-map in uw werkstroomindelingsbeheeromgeving. Als u er nog geen hebt, maakt u een nieuwe