Pipelines e atividades no Azure Data Factory e no Azure Synapse Analytics
APLICA-SE A: Azure Data Factory Azure Synapse Analytics
Gorjeta
Experimente o Data Factory no Microsoft Fabric, uma solução de análise tudo-em-um para empresas. O Microsoft Fabric abrange tudo, desde a movimentação de dados até ciência de dados, análises em tempo real, business intelligence e relatórios. Saiba como iniciar uma nova avaliação gratuitamente!
Importante
O suporte para o Azure Machine Learning Studio (clássico) terminará em 31 de agosto de 2024. Recomendamos que faça a transição para o Azure Machine Learning até essa data.
A partir de 1º de dezembro de 2021, não é possível criar novos recursos (clássicos) do Machine Learning Studio (espaço de trabalho e plano de serviço Web). Até 31 de agosto de 2024, você pode continuar a usar os experimentos e serviços Web existentes do Machine Learning Studio (clássicos). Para obter mais informações, consulte:
- Migrar para o Azure Machine Learning a partir do Machine Learning Studio (clássico)
- O que é o Azure Machine Learning?
A documentação do Machine Learning Studio (clássica) está sendo desativada e pode não ser atualizada no futuro.
Este artigo ajuda você a entender pipelines e atividades no Azure Data Factory e no Azure Synapse Analytics e usá-los para construir fluxos de trabalho controlados por dados de ponta a ponta para seus cenários de movimentação e processamento de dados.
Descrição geral
Um Data Factory ou Synapse Workspace pode ter um ou mais pipelines. Um pipeline é um agrupamento lógico de atividades que, em conjunto, executam uma tarefa. Por exemplo, um pipeline pode conter um conjunto de atividades que ingerem e limpam dados de registos e, em seguida, iniciam um fluxo de dados de mapeamento para analisar os dados de registo. O pipeline permite-lhe gerir as atividades como um conjunto, em vez de cada uma individualmente. Implementa e agenda o pipeline, em vez das atividades de forma independente.
As atividades num pipeline definem as ações a executar nos seus dados. Por exemplo, pode utilizar uma atividade de cópia para copiar dados do SQL Server para um Armazenamento de Blobs do Azure. Em seguida, use uma atividade de fluxo de dados ou uma atividade do Bloco de Anotações Databricks para processar e transformar dados do armazenamento de blob em um pool do Azure Synapse Analytics sobre o qual as soluções de relatórios de business intelligence são criadas.
O Azure Data Factory e o Azure Synapse Analytics têm três agrupamentos de atividades: atividades de movimentação de dados, atividades de transformação de dados e atividades de controle. Uma atividade pode ter zero ou mais conjuntos de dados de entrada e produzir um ou mais conjuntos de dados de saída. O diagrama a seguir mostra a relação entre pipeline, atividade e conjunto de dados:
Um conjunto de dados de entrada representa a entrada para uma atividade no pipeline e um conjunto de dados de saída representa a saída para a atividade. Os conjuntos de dados identificam dados dentro de diferentes arquivos de dados, como tabelas, ficheiros, pastas e documentos. Depois de criar um conjunto de dados, pode utilizá-lo com atividades num pipeline. Por exemplo, um conjunto de dados pode ser um conjunto de dados de entrada/saída de uma atividade Cópia ou de uma atividade HDInsightHive. Para obter mais informações sobre os conjuntos de dados, veja o artigo Datasets in Azure Data Factory (Conjuntos de Dados no Azure Data Factory).
Nota
Há um limite flexível padrão de no máximo 80 atividades por pipeline, que inclui atividades internas para contêineres.
Atividades de movimento de dados
A Atividade de Cópia no Data Factory copia os dados de um arquivo de dados de origem para um arquivo de dados sink. O Data Factory suporta os arquivos de dados apresentados na tabela nesta secção. Os dados de qualquer origem podem ser escritos em qualquer sink.
Para obter mais informações, veja o artigo Copy Activity - Overview (Atividade de Cópia - Descrição Geral).
Clique num arquivo de dados para saber como copiar dados de e para esse arquivo.
Nota
Se um conector estiver marcado como Pré-visualização, pode experimentá-lo e enviar-nos comentários. Se quiser realizar uma dependência em conectores de pré-visualização na sua solução, contacte o suporte do Azure.
Atividades de transformação de dados
O Azure Data Factory e o Azure Synapse Analytics dão suporte às seguintes atividades de transformação que podem ser adicionadas individualmente ou encadeadas com outra atividade.
Para obter mais informações, veja o artigo Data Transformation Activities (Atividades de Transformação de Dados).
Atividade de transformação de dados | Ambiente de computação |
---|---|
Fluxo de Dados | Clusters do Apache Spark gerenciados pelo Azure Data Factory |
Função do Azure | Funções do Azure |
Hive | HDInsight [Hadoop] |
Pig | HDInsight [Hadoop] |
MapReduce | HDInsight [Hadoop] |
Hadoop Streaming | HDInsight [Hadoop] |
Spark | HDInsight [Hadoop] |
Atividades do ML Studio (clássico): Execução em lote e recurso de atualização | VM do Azure |
Procedimento Armazenado | Azure SQL, Azure Synapse Analytics ou SQL Server |
U-SQL | Azure Data Lake Analytics |
Atividade Personalizada | Azure Batch |
Databricks Notebook | Azure Databricks |
Atividade do Databricks Jar | Azure Databricks |
Atividade Python do Databricks | Azure Databricks |
Atividade do Caderno de Sinapse | Azure Synapse Analytics |
Atividades de fluxo de controlo
São suportadas as atividades de fluxo de controlo abaixo:
Atividade de controlo | Description |
---|---|
Acrescentar variável | Adicione um valor a uma variável de matriz existente. |
Executar pipeline | A atividade Executar pipeline permite que um pipeline do Data Factory ou Synapse invoque outro pipeline. |
Filtro | Aplicar uma expressão de filtro a uma matriz de entrada |
Para cada | A atividade ForEach define um fluxo de controlo de repetição no seu pipeline. Esta atividade é utilizada para iterar uma coleção e executa atividades especificadas em ciclo. A implementação de ciclo desta atividade é semelhante à estrutura de ciclo Foreach nas linguagens de programação. |
Obter metadados | A atividade GetMetadata pode ser usada para recuperar metadados de quaisquer dados em um pipeline do Data Factory ou Synapse. |
Atividade Se Condição | Se Condição pode ser utilizada com base em ramos numa condição que é avaliada como verdadeira ou falsa. A atividade Se Condição disponibiliza a mesma funcionalidade que as instruções “se” fornecem nas linguagens de programação. Ele avalia um conjunto de atividades quando a condição é avaliada e true outro conjunto de atividades quando a condição avalia como false. |
Atividade de Pesquisa | A atividade de Pesquisa pode ser utilizada para ler ou procurar registos/nomes de tabelas/valores em qualquer origem externa. Este resultado pode ser ainda referenciado por atividades subsequentes. |
Definir variável | Defina o valor de uma variável existente. |
Atividade Until | Implementa o ciclo Do-Until que é semelhante à estrutura de ciclo Do-Until nas linguagens de programação. Executa um conjunto de atividades num ciclo até que a condição associada às atividades seja avaliada como verdadeira. Você pode especificar um valor de tempo limite para a atividade até. |
Atividade de validação | Certifique-se de que um pipeline só continue a execução se existir um conjunto de dados de referência, atender a um critério especificado ou se um tempo limite tiver sido atingido. |
Atividade Aguardar | Quando você usa uma atividade de espera em um pipeline, o pipeline aguarda o tempo especificado antes de continuar com a execução das atividades subsequentes. |
Atividade Web | A Atividade da Web pode ser usada para chamar um ponto de extremidade REST personalizado a partir de um pipeline. Pode transmitir conjuntos de dados e serviços ligados aos quais a atividade tem acesso e que pode consumir. |
Atividade de Webhook | Usando a atividade webhook, chame um ponto de extremidade e passe uma URL de retorno de chamada. A execução do pipeline aguarda que o retorno de chamada seja invocado antes de prosseguir para a próxima atividade. |
Criando um pipeline com a interface do usuário
Para criar um novo pipeline, navegue até a guia Autor no Data Factory Studio (representado pelo ícone de lápis), clique no sinal de adição e escolha Pipeline no menu e Pipeline novamente no submenu.
O data factory exibirá o editor de pipeline onde você pode encontrar:
- Todas as atividades que podem ser usadas dentro do pipeline.
- A tela do editor de pipeline, onde as atividades aparecerão quando adicionadas ao pipeline.
- O painel de configurações de pipeline, incluindo parâmetros, variáveis, configurações gerais e saída.
- O painel de propriedades do pipeline, onde o nome do pipeline, a descrição opcional e as anotações podem ser configurados. Esse painel também mostrará todos os itens relacionados ao pipeline dentro do data factory.
JSON do Pipeline
Eis como os pipelines são definidos no formato JSON:
{
"name": "PipelineName",
"properties":
{
"description": "pipeline description",
"activities":
[
],
"parameters": {
},
"concurrency": <your max pipeline concurrency>,
"annotations": [
]
}
}
Etiqueta | Description | Type | Necessário |
---|---|---|---|
nome | Nome do pipeline. Especifique um nome que represente a ação que o pipeline realiza.
|
Cadeia (de carateres) | Sim |
descrição | Especifique o texto que descreve para o que é utilizado o pipeline. | String | Não |
atividades | A secção atividades pode ter uma ou mais atividades definidas na mesma. Veja a secção JSON da Atividade para obter detalhes sobre o elemento JSON das atividades. | Matriz | Sim |
parâmetros | A secção parâmetros pode ter um ou mais parâmetros definidos no pipeline, tornando-o flexível para reutilização. | Listagem | Não |
simultaneidade | O número máximo de execuções simultâneas que o pipeline pode ter. Por padrão, não há um máximo. Se o limite de simultaneidade for atingido, execuções de pipeline adicionais serão enfileiradas até que as anteriores sejam concluídas | Número | Não |
anotações | Uma lista de tags associadas ao pipeline | Matriz | Não |
JSON da Atividade
A secção atividades pode ter uma ou mais atividades definidas na mesma. Existem dois tipos principais de atividades: atividades de Execução e de Controlo.
Atividades de Execução
As atividades de execução incluem atividades de movimento de dados e de transformação de dados. Têm a estrutura de nível superior abaixo:
{
"name": "Execution Activity Name",
"description": "description",
"type": "<ActivityType>",
"typeProperties":
{
},
"linkedServiceName": "MyLinkedService",
"policy":
{
},
"dependsOn":
{
}
}
A tabela seguinte descreve as propriedades na definição JSON da atividade:
Etiqueta | Description | Obrigatório |
---|---|---|
nome | Nome da atividade. Especifique um nome que represente a ação que a atividade realiza.
|
Sim |
descrição | Texto que descreve para o que é utilizada a atividade | Sim |
tipo | Tipo de atividade. Veja os diferentes tipos de atividades nas secções Atividades de Movimento de Dados, Atividades de Transformação de Dados e Atividades de Controlo. | Sim |
linkedServiceName | Nome do serviço ligado utilizado pela atividade. Uma atividade pode exigir que você especifique o serviço vinculado vinculado ao ambiente de computação necessário. |
Sim para Atividade do HDInsight, Atividade de pontuação em lote do ML Studio (clássica), Atividade de procedimento armazenado. Não para todas as outras. |
typeProperties | As propriedades na secção typeProperties dependem de cada tipo de atividade. Para ver as propriedades do tipo de uma atividade, clique nas ligações para a atividade na secção anterior. | Não |
política | Políticas que afetam o comportamento de runtime da atividade. Esta propriedade inclui um tempo limite e um comportamento de nova tentativa. Se não for especificado, os valores padrão serão usados. Para obter mais informações, veja a secção Política das atividades. | Não |
dependsOn | Esta propriedade é utilizada para definir as dependências de atividade e de que forma as atividades subsequentes dependem de atividades anteriores. Para obter mais informações, veja Dependência das atividades | Não |
Política de atividade
As políticas afetam o comportamento em tempo de execução de uma atividade, fornecendo opções de configuração. As políticas das Atividades só estão disponíveis para as atividades de execução.
Definição JSON da política de atividade
{
"name": "MyPipelineName",
"properties": {
"activities": [
{
"name": "MyCopyBlobtoSqlActivity",
"type": "Copy",
"typeProperties": {
...
},
"policy": {
"timeout": "00:10:00",
"retry": 1,
"retryIntervalInSeconds": 60,
"secureOutput": true
}
}
],
"parameters": {
...
}
}
}
Nome JSON | Description | Valores Permitidos | Necessário |
---|---|---|---|
tempo limite | Especifica o tempo limite para a execução da atividade. | Timespan | N.º O tempo limite padrão é de 12 horas, mínimo de 10 minutos. |
retry | Número máximo de repetições | Número inteiro | N.º A predefinição é 0 |
retryIntervalInSeconds | O atraso entre as tentativas de repetição em segundos | Número inteiro | N.º O padrão é 30 segundos |
secureOutput | Quando definido como true, a saída da atividade é considerada segura e não é registrada para monitoramento. | Boolean | N.º A predefinição é falsa. |
Atividade de controlo
As atividades de controlo têm a estrutura de nível superior seguinte:
{
"name": "Control Activity Name",
"description": "description",
"type": "<ActivityType>",
"typeProperties":
{
},
"dependsOn":
{
}
}
Etiqueta | Description | Obrigatório |
---|---|---|
nome | Nome da atividade. Especifique um nome que represente a ação que a atividade realiza.
|
Sim |
descrição | Texto que descreve para o que é utilizada a atividade | Sim |
tipo | Tipo de atividade. Veja os diferentes tipos de atividades nas secções sobre as atividades de movimento de dados, as atividades de transformação de dados e as atividades de controlo. | Sim |
typeProperties | As propriedades na secção typeProperties dependem de cada tipo de atividade. Para ver as propriedades do tipo de uma atividade, clique nas ligações para a atividade na secção anterior. | Não |
dependsOn | Esta propriedade é utilizada para definir a Dependência da Atividade e de que forma as atividades subsequentes dependem de atividades anteriores. Para obter mais informações, veja Dependência das atividades. | Não |
Dependência das atividades
A Dependência de Atividade define como as atividades subsequentes dependem de atividades anteriores, determinando a condição de continuar ou não executando a próxima tarefa. Uma atividade pode depender de uma ou várias atividades anteriores com condições de dependência diferentes.
As condições de dependência diferentes são: Bem-sucedida, Falha, Ignorada, Concluída.
Por exemplo, se um pipeline tiver Atividade A -> Atividade B, os diferentes cenários que podem acontecer são:
- A Atividade B tem a condição de dependência de a Atividade A ser bem-sucedida - a Atividade B só é executada se o estado final de A for “bem-sucedida”
- A Atividade B tem a condição de dependência de a Atividade A falhar - a Atividade B só é executada se o estado final de A for “falha”
- A Atividade B tem a condição de dependência de a Atividade A ser concluída - a Atividade B só é executada se o estado final de A for “concluída”
- A Atividade B tem uma condição de dependência da Atividade A com ignorado: A Atividade B é executada se a Atividade A tiver um status final de ignorado. Ignorado ocorre no cenário de Atividade X -> Atividade Y -> Atividade Z, onde cada atividade é executada somente se a atividade anterior for bem-sucedida. Se a Atividade X falhar, a Atividade Y terá um status de "Ignorado" porque nunca será executada. Da mesma forma, a Atividade Z também tem um status de "Ignorado".
Exemplo: a Atividade 2 depende de a Atividade 1 ser bem-sucedida
{
"name": "PipelineName",
"properties":
{
"description": "pipeline description",
"activities": [
{
"name": "MyFirstActivity",
"type": "Copy",
"typeProperties": {
},
"linkedServiceName": {
}
},
{
"name": "MySecondActivity",
"type": "Copy",
"typeProperties": {
},
"linkedServiceName": {
},
"dependsOn": [
{
"activity": "MyFirstActivity",
"dependencyConditions": [
"Succeeded"
]
}
]
}
],
"parameters": {
}
}
}
Pipeline de cópia de exemplo
No pipeline de exemplo seguinte, existe uma atividade do tipo Cópia na secção activities. Neste exemplo, a atividade de cópia copia dados de um armazenamento de Blob do Azure para um banco de dados no Banco de Dados SQL do Azure.
{
"name": "CopyPipeline",
"properties": {
"description": "Copy data from a blob to Azure SQL table",
"activities": [
{
"name": "CopyFromBlobToSQL",
"type": "Copy",
"inputs": [
{
"name": "InputDataset"
}
],
"outputs": [
{
"name": "OutputDataset"
}
],
"typeProperties": {
"source": {
"type": "BlobSource"
},
"sink": {
"type": "SqlSink",
"writeBatchSize": 10000,
"writeBatchTimeout": "60:00:00"
}
},
"policy": {
"retry": 2,
"timeout": "01:00:00"
}
}
]
}
}
Tenha em conta os seguintes pontos:
- Na secção atividades, existe apenas uma atividade cujo type está definido como Copy.
- A entrada da atividade está definida como InputDataset e a saída como OutputDataset. Veja o artigo Conjuntos de dados para saber como definir conjuntos de dados em JSON.
- Na secção typeProperties, BlobSource está especificado como o tipo de origem e SqlSink como o tipo de sink. Na secção atividades de movimento de dados, clique no arquivo de dados que pretende utilizar como origem ou sink para saber mais sobre como mover dados de/para esse arquivo.
Para obter um passo a passo completo sobre como criar esse pipeline, consulte Guia de início rápido: criar um Data Factory.
Pipeline de transformação de exemplos
No pipeline de exemplo seguinte, existe uma atividade do tipo HDInsightHive na secção activities. Neste exemplo, a atividade Hive do HDInsight transforma dados de um armazenamento de Blobs do Azure mediante a execução de um ficheiro de script de Hive num cluster do Hadoop para o Azure HDInsight.
{
"name": "TransformPipeline",
"properties": {
"description": "My first Azure Data Factory pipeline",
"activities": [
{
"type": "HDInsightHive",
"typeProperties": {
"scriptPath": "adfgetstarted/script/partitionweblogs.hql",
"scriptLinkedService": "AzureStorageLinkedService",
"defines": {
"inputtable": "wasb://adfgetstarted@<storageaccountname>.blob.core.windows.net/inputdata",
"partitionedtable": "wasb://adfgetstarted@<storageaccountname>.blob.core.windows.net/partitioneddata"
}
},
"inputs": [
{
"name": "AzureBlobInput"
}
],
"outputs": [
{
"name": "AzureBlobOutput"
}
],
"policy": {
"retry": 3
},
"name": "RunSampleHiveActivity",
"linkedServiceName": "HDInsightOnDemandLinkedService"
}
]
}
}
Tenha em conta os seguintes pontos:
- Na secção “activities”, existe apenas uma atividade cujo type está definido como HDInsightHive.
- O arquivo de script do Hive, partitionweblogs.hql, é armazenado na conta de Armazenamento do Azure (especificada pelo scriptLinkedService, chamado AzureStorageLinkedService) e na pasta de script no contêiner
adfgetstarted
. - A secção
defines
é utilizada para especificar as definições de runtime que são transmitidas ao script do Hive como valores de configuração do Hive (por exemplo, ${hiveconf:inputtable}
e${hiveconf:partitionedtable}
).
A secção typeProperties é diferente para cada atividade de transformação. Para saber mais sobre as propriedades de tipos suportadas para atividades de transformação, clique na atividade de transformação em Atividades de transformação de dados.
Para obter instruções completas para criar este pipeline, veja Tutorial: transformar dados com o Spark.
Múltiplas atividades num pipeline
Os dois pipelines de exemplo anteriores só contêm uma atividade. Pode ter mais de uma atividade num pipeline. Se você tiver várias atividades em um pipeline e as atividades subsequentes não dependerem de atividades anteriores, as atividades poderão ser executadas em paralelo.
Pode utilizar a dependência das atividades para encadear duas atividades; a dependência define de que forma é que as atividades subsequentes dependem das atividades anteriores, determinando a condição de continuação da execução da tarefa seguinte. Uma atividade pode depender de uma ou várias atividades anteriores com condições de dependência diferentes.
Agendamento de pipelines
Os pipelines são agendados por acionadores. Existem diferentes tipos de gatilhos (gatilho Scheduler, que permite que pipelines sejam acionados em uma programação de relógio de parede, bem como o gatilho manual, que aciona pipelines sob demanda). Para obter mais informações sobre os acionadores, veja o artigo Pipeline execution and triggers (Execução e acionadores de pipelines).
Para que o seu acionador arranque uma execução de pipeline, tem de incluir uma referência ao pipeline desse pipeline na definição do acionador. Os pipelines e os acionadores têm uma relação “n-m” (muitos para muitos). Múltiplos acionadores podem arrancar um pipeline individual e o mesmo acionador pode arrancar vários pipelines. Quando o acionador estiver definido, tem de iniciá-lo para que o mesmo comece a acionar o pipeline. Para obter mais informações sobre os acionadores, veja o artigo Pipeline execution and triggers (Execução e acionadores de pipelines).
Por exemplo, digamos que você tenha um gatilho do Agendador, "Gatilho A", que eu deseje iniciar meu pipeline, "MyCopyPipeline". Você define o gatilho, conforme mostrado no exemplo a seguir:
Definição do Acionador A
{
"name": "TriggerA",
"properties": {
"type": "ScheduleTrigger",
"typeProperties": {
...
}
},
"pipeline": {
"pipelineReference": {
"type": "PipelineReference",
"referenceName": "MyCopyPipeline"
},
"parameters": {
"copySourceName": "FileSource"
}
}
}
}