Atividade de fluxo de dados no Azure Data Factory e no Azure Synapse Analytics
APLICA-SE A: Azure Data Factory Azure Synapse Analytics
Gorjeta
Experimente o Data Factory no Microsoft Fabric, uma solução de análise tudo-em-um para empresas. O Microsoft Fabric abrange tudo, desde a movimentação de dados até ciência de dados, análises em tempo real, business intelligence e relatórios. Saiba como iniciar uma nova avaliação gratuitamente!
Use a atividade Fluxo de Dados para transformar e mover dados por meio de fluxos de dados de mapeamento. Se você é novo em fluxos de dados, consulte Visão geral do fluxo de dados de mapeamento
Criar uma atividade de fluxo de dados com a interface do usuário
Para usar uma atividade de Fluxo de Dados em um pipeline, conclua as seguintes etapas:
Procure Fluxo de Dados no painel Atividades do pipeline e arraste uma atividade de Fluxo de Dados para a tela do pipeline.
Selecione a nova atividade Fluxo de Dados na tela, se ainda não estiver selecionada, e a guia Configurações para editar seus detalhes.
A chave de ponto de verificação é usada para definir o ponto de verificação quando o fluxo de dados é usado para captura de dados alterados. Você pode substituí-lo. As atividades de fluxo de dados usam um valor guid como chave de ponto de verificação em vez de "nome do pipeline + nome da atividade" para que ele possa sempre acompanhar o estado de captura de dados de alteração do cliente, mesmo que haja ações de renomeação. Toda a atividade de fluxo de dados existente usa a chave de padrão antiga para compatibilidade com versões anteriores. A opção de chave de ponto de verificação após a publicação de uma nova atividade de fluxo de dados com recurso de fluxo de dados habilitado para captura de dados de alteração é mostrada abaixo.
Selecione um fluxo de dados existente ou crie um novo usando o botão Novo. Selecione outras opções conforme necessário para concluir a configuração.
Sintaxe
{
"name": "MyDataFlowActivity",
"type": "ExecuteDataFlow",
"typeProperties": {
"dataflow": {
"referenceName": "MyDataFlow",
"type": "DataFlowReference"
},
"compute": {
"coreCount": 8,
"computeType": "General"
},
"traceLevel": "Fine",
"runConcurrently": true,
"continueOnError": true,
"staging": {
"linkedService": {
"referenceName": "MyStagingLinkedService",
"type": "LinkedServiceReference"
},
"folderPath": "my-container/my-folder"
},
"integrationRuntime": {
"referenceName": "MyDataFlowIntegrationRuntime",
"type": "IntegrationRuntimeReference"
}
}
Propriedades do tipo
Property | Description | Valores permitidos | Necessário |
---|---|---|---|
fluxo de dados | A referência ao fluxo de dados que está sendo executado | DataFlowReference | Sim |
integrationRuntime | O ambiente de computação em que o fluxo de dados é executado. Se não for especificado, o tempo de execução de integração do Azure de autoresolução será usado. | IntegrationRuntimeReference | Não |
compute.coreCount | O número de núcleos usados no cluster de faísca. Só pode ser especificado se o tempo de execução da Integração do Azure de resolução automática for usado | 8, 16, 32, 48, 80, 144, 272 | Não |
compute.computeType | O tipo de computação usado no cluster de faísca. Só pode ser especificado se o tempo de execução da Integração do Azure de resolução automática for usado | "Geral" | Não |
staging.linkedService | Se você estiver usando uma fonte ou coletor do Azure Synapse Analytics, especifique a conta de armazenamento usada para o preparo do PolyBase. Se o seu Armazenamento do Azure estiver configurado com o ponto de extremidade do serviço VNet, você deverá usar a autenticação de identidade gerenciada com "permitir serviço confiável da Microsoft" habilitado na conta de armazenamento, consulte Impacto do uso de Pontos de Extremidade de Serviço VNet com o armazenamento do Azure. Conheça também as configurações necessárias para o Blob do Azure e o Azure Data Lake Storage Gen2 , respectivamente. |
LinkedServiceReference | Somente se o fluxo de dados ler ou gravar em um Azure Synapse Analytics |
staging.folderPath | Se você estiver usando uma fonte ou coletor do Azure Synapse Analytics, o caminho da pasta na conta de armazenamento de blob usado para o preparo do PolyBase | String | Somente se o fluxo de dados ler ou gravar no Azure Synapse Analytics |
traceLevel | Definir o nível de registro da execução da atividade de fluxo de dados | Bom, grosseiro, nenhum | Não |
Dimensione dinamicamente a computação do fluxo de dados em tempo de execução
As propriedades Core Count e Compute Type podem ser definidas dinamicamente para se ajustarem ao tamanho dos dados de origem recebidos em tempo de execução. Use atividades de pipeline como Pesquisa ou Obter Metadados para encontrar o tamanho dos dados do conjunto de dados de origem. Em seguida, use Adicionar Conteúdo Dinâmico nas propriedades da atividade Fluxo de Dados. Você pode escolher tamanhos de computação pequenos, médios ou grandes. Opcionalmente, escolha "Personalizado" e configure os tipos de computação e o número de núcleos manualmente.
Aqui está um breve tutorial em vídeo explicando essa técnica:
Tempo de execução da integração do fluxo de dados
Escolha qual Tempo de Execução de Integração usar para a execução da atividade de Fluxo de Dados. Por predefinição, o serviço utiliza o Azure Integration Runtime de resolução automática com quatro núcleos de trabalho. Este IR tem um tipo de computação de uso geral e é executado na mesma região que sua instância de serviço. Para pipelines operacionalizados, é altamente recomendável que você crie seus próprios Tempos de Execução de Integração do Azure que definem regiões específicas, tipo de computação, contagens de núcleo e TTL para a execução da atividade de fluxo de dados.
Um tipo mínimo de computação de Propósito Geral com uma configuração de 8+8 (16 v-cores totais) e um tempo de vida (TTL) de 10 minutos é a recomendação mínima para a maioria das cargas de trabalho de produção. Ao definir um pequeno TTL, o IR do Azure pode manter um cluster quente que não incorrerá nos vários minutos de hora de início de um cluster frio. Para obter mais informações, consulte Tempo de execução da integração do Azure.
Importante
A seleção do Tempo de Execução de Integração na atividade Fluxo de Dados só se aplica a execuções acionadas do seu pipeline. A depuração do pipeline com fluxos de dados é executada no cluster especificado na sessão de depuração.
PolyBase
Se você estiver usando um Azure Synapse Analytics como um coletor ou fonte, deverá escolher um local de preparo para sua carga em lote do PolyBase. O PolyBase permite o carregamento em lote em massa em vez de carregar os dados linha por linha. O PolyBase reduz drasticamente o tempo de carregamento no Azure Synapse Analytics.
Chave de ponto de verificação
Ao usar a opção de captura de alterações para fontes de fluxo de dados, o ADF mantém e gerencia o ponto de verificação para você automaticamente. A chave de ponto de verificação padrão é um hash do nome do fluxo de dados e do nome do pipeline. Se você estiver usando um padrão dinâmico para suas tabelas ou pastas de origem, talvez queira substituir esse hash e definir seu próprio valor de chave de ponto de verificação aqui.
Nível de registo
Se você não precisar que todas as execuções de pipeline de suas atividades de fluxo de dados registrem totalmente todos os logs de telemetria detalhados, você pode, opcionalmente, definir seu nível de log como "Básico" ou "Nenhum". Ao executar seus fluxos de dados no modo "Detalhado" (padrão), você está solicitando que o serviço registre totalmente a atividade em cada nível de partição individual durante a transformação de dados. Essa pode ser uma operação cara, portanto, apenas habilitar detalhadamente a solução de problemas pode melhorar o fluxo geral de dados e o desempenho do pipeline. O modo "Básico" registra apenas as durações de transformação, enquanto "Nenhum" fornece apenas um resumo das durações.
Propriedades do lavatório
O recurso de agrupamento em fluxos de dados permite que você defina a ordem de execução de seus coletores, bem como agrupe coletores usando o mesmo número de grupo. Para ajudar a gerenciar grupos, você pode pedir ao serviço para executar coletores, no mesmo grupo, em paralelo. Você também pode definir o grupo de coletores para continuar mesmo depois que um dos coletores encontrar um erro.
O comportamento padrão dos coletores de fluxo de dados é executar cada coletor sequencialmente, de maneira serial, e falhar o fluxo de dados quando um erro for encontrado no coletor. Além disso, todos os coletores são padronizados para o mesmo grupo, a menos que você entre nas propriedades de fluxo de dados e defina prioridades diferentes para os coletores.
Apenas na primeira fila
Esta opção só está disponível para fluxos de dados que tenham coletores de cache habilitados para "Saída para atividade". A saída do fluxo de dados que é injetado diretamente em seu pipeline é limitada a 2MB. Definir "somente primeira linha" ajuda a limitar a saída de dados do fluxo de dados ao injetar a saída da atividade de fluxo de dados diretamente no pipeline.
Parametrização de fluxos de dados
Conjuntos de dados parametrizados
Se o fluxo de dados usar conjuntos de dados parametrizados, defina os valores dos parâmetros na guia Configurações .
Fluxos de dados parametrizados
Se o fluxo de dados for parametrizado, defina os valores dinâmicos dos parâmetros de fluxo de dados na guia Parâmetros . Você pode usar a linguagem de expressão de pipeline ou a linguagem de expressão de fluxo de dados para atribuir valores de parâmetros dinâmicos ou literais. Para obter mais informações, consulte Parâmetros de fluxo de dados.
Propriedades de computação parametrizadas.
Você pode parametrizar a contagem de núcleos ou o tipo de computação se usar o tempo de execução da Integração do Azure de autoresolução e especificar valores para compute.coreCount e compute.computeType.
Depuração de pipeline da atividade de fluxo de dados
Para executar um pipeline de depuração executado com uma atividade de Fluxo de Dados, você deve ativar o modo de depuração de fluxo de dados por meio do controle deslizante Depurar Fluxo de Dados na barra superior. O modo de depuração permite executar o fluxo de dados em um cluster ativo do Spark. Para obter mais informações, consulte Modo de depuração.
O pipeline de depuração é executado no cluster de depuração ativo, não no ambiente de tempo de execução de integração especificado nas configurações de atividade do Fluxo de Dados. Você pode escolher o ambiente de computação de depuração ao iniciar o modo de depuração.
Monitorando a atividade de fluxo de dados
A atividade Fluxo de Dados tem uma experiência especial de monitoramento, onde você pode visualizar informações de particionamento, tempo de estágio e linhagem de dados. Abra o painel de monitorização através do ícone de óculos em Ações. Para obter mais informações, consulte Monitorando fluxos de dados.
Usar resultados da atividade Fluxo de Dados em uma atividade subsequente
A atividade de fluxo de dados produz métricas relativas ao número de linhas gravadas em cada coletor e linhas lidas de cada fonte. Esses resultados são retornados na output
seção do resultado da execução da atividade. As métricas retornadas estão no formato do json abaixo.
{
"runStatus": {
"metrics": {
"<your sink name1>": {
"rowsWritten": <number of rows written>,
"sinkProcessingTime": <sink processing time in ms>,
"sources": {
"<your source name1>": {
"rowsRead": <number of rows read>
},
"<your source name2>": {
"rowsRead": <number of rows read>
},
...
}
},
"<your sink name2>": {
...
},
...
}
}
}
Por exemplo, para chegar ao número de linhas gravadas em um coletor chamado 'sink1' em uma atividade chamada 'dataflowActivity', use @activity('dataflowActivity').output.runStatus.metrics.sink1.rowsWritten
.
Para obter o número de linhas lidas de uma fonte chamada 'source1' que foi usada nesse coletor, use @activity('dataflowActivity').output.runStatus.metrics.sink1.sources.source1.rowsRead
.
Nota
Se um coletor tiver zero linhas escritas, ele não aparecerá nas métricas. A existência pode ser verificada usando a contains
função. Por exemplo, contains(activity('dataflowActivity').output.runStatus.metrics, 'sink1')
verifica se alguma linha foi gravada no sink1.
Conteúdos relacionados
Veja as atividades de fluxo de controle suportadas: