Configurar fluxos de dados no Azure IoT Operations
Importante
A Versão Prévia das Operações da Internet das Coisas do Azure – habilitadas pelo Azure Arc – está atualmente em versão prévia. Você não deve usar esse software em versão prévia em ambientes de produção.
Você precisará implantar uma nova instalação do Azure IoT Operations quando uma versão geralmente disponível estiver disponível. Você não poderá atualizar uma instalação de versão prévia.
Para os termos legais que se aplicam aos recursos do Azure que estão em versão beta, versão prévia ou que, de outra forma, ainda não foram lançados em disponibilidade geral, confira os Termos de Uso Complementares para Versões Prévias do Microsoft Azure.
Um fluxo de dados é o caminho que os dados levam da origem para o destino com transformações opcionais. Você pode configurar o fluxo de dados criando um recurso personalizado Fluxo de dados ou usando o portal do Azure IoT Operations Studio. Um fluxo de dados é composto de três partes: a origem, a transformação e o destino.
Para definir a origem e o destino, você precisa configurar os pontos de extremidade do fluxo de dados. A transformação é opcional e pode incluir operações como enriquecimento de dados, filtragem de dados e mapeamento de dados para outro campo.
Importante
Cada fluxo de dados deve ter o ponto de extremidade padrão do agente MQTT local das Operações do Azure IoT como a origem ou destino.
Você pode usar a experiência de operações no Operações do Azure IoT para criar um fluxo de dados. A experiência de operações fornece uma interface visual para configurar o fluxo de dados. Você também pode usar o Bicep para criar um fluxo de dados usando um arquivo de modelo do Bicep ou usar o Kubernetes para criar um fluxo de dados usando um arquivo YAML.
Continue lendo para saber como configurar a fonte, a transformação e o destino.
Pré-requisitos
Você pode implantar fluxos de dados assim que tiver uma instância das Operações do Azure IoT Versão Prévia usando o perfil de fluxo de dados e o ponto de extremidade padrão. No entanto, talvez você queira configurar perfis de fluxo de dados e pontos de extremidade para personalizar o fluxo de dados.
Perfil de fluxo de dados
O perfil de fluxo de dados especifica o número de instâncias a serem usadas pelos fluxos de dados sob ele. Se você não precisar de vários grupos de fluxos de dados com diferentes configurações de escala, poderá usar o perfil de fluxo de dados padrão. Para saber como configurar um perfil de fluxo de dados, confira Configurar perfis de fluxo de dados.
Pontos de extremidade de fluxo de dados
Os pontos de extremidade de fluxo de dados são necessários para configurar a fonte e o destino do fluxo de dados. Para começar rapidamente, você pode usar o ponto de extremidade de fluxo de dados padrão para o agente MQTT local. Você também pode criar outros tipos de pontos de extremidade de fluxo de dados, como Kafka, Hubs de Eventos ou Azure Data Lake Storage. Para saber como configurar cada tipo de ponto de extremidade de fluxo de dados, veja Configurar pontos de extremidade de fluxo de dados.
Introdução
Quando tiver os pré-requisitos, você poderá começar a criar um fluxo de dados.
Para criar um fluxo de dados na experiência de operações, selecione Fluxo de dados>Criar fluxo de dados. Em seguida, você verá a página em que poderá configurar a fonte, a transformação e o destino do fluxo de dados.
Examine as seções a seguir para saber como configurar os tipos de operação do fluxo de dados.
Origem
Para configurar uma fonte para o fluxo de dados, especifique a referência do ponto de extremidade e uma lista de fontes de dados para o ponto de extremidade.
Usar ativo como origem
Você pode usar um ativo como fonte para o fluxo de dados. O uso de um ativo como fonte só está disponível na experiência de operações.
Em Detalhes da fonte, selecione Ativo.
Selecione o ativo que você deseja usar como ponto de extremidade de origem.
Selecione Continuar.
Uma lista de pontos de dados para o ativo selecionado é exibida.
Selecione Aplicar para usar o ativo como o ponto de extremidade de origem.
Ao usar um ativo como origem, a definição do ativo é usada para inferir o esquema para o fluxo de dados. A definição do ativo inclui o esquema para os pontos de dados do ativo. Para saber mais, consulte Gerenciar configurações do ativo remotamente.
Depois de configurado, os dados do ativo chegam ao fluxo de dados via o agente MQTT local. Portanto, ao usar um ativo como origem, o fluxo de dados usa o ponto de extremidade padrão do agente MQTT local como origem na prática.
Usar o ponto de extremidade MQTT padrão como fonte
Em Detalhes da fonte, selecione MQTT.
Insira as seguintes configurações para a fonte MQTT:
Configuração Descrição Tópico do MQTT O filtro de tópico MQTT a ser assinado para as mensagens de entrada. Confira Configurar tópicos do MQTT ou do Kafka. Esquema de mensagem O esquema a ser usado para desserializar as mensagens de entrada. Confira Especificar o esquema para desserializar dados. Escolha Aplicar.
Se o ponto de extremidade padrão não for usado como origem, ele deve ser usado como destino. Para saber mais sobre, consulte Os fluxos de dados devem usar o ponto de extremidade do agente MQTT local.
Usar o ponto de extremidade de fluxo de dados MQTT ou Kafka personalizado como fonte
Se você criou um ponto de extremidade de fluxo de dados MQTT ou Kafka personalizado (por exemplo, para usar com a Grade de Eventos ou Hubs de Eventos), poderá usá-lo como a fonte para o fluxo de dados. Lembre-se de que os pontos de extremidade do tipo de armazenamento, como Data Lake ou Fabric OneLake, não podem ser usados como fonte.
Para configurar, use o YAML do Kubernetes ou o Bicep. Substitua os valores de espaço reservado por seu nome de ponto de extremidade personalizado e tópicos.
No momento, não há suporte para o uso de um ponto de extremidade MQTT ou Kafka personalizado como fonte na experiência de operações.
Configurar fontes de dados (tópicos MQTT ou Kafka)
Você pode especificar vários tópicos MQTT ou Kafka em uma fonte sem precisar modificar a configuração do ponto de extremidade do fluxo de dados. Essa flexibilidade significa que o mesmo ponto de extremidade pode ser reutilizado em vários fluxos de dados, mesmo que os tópicos variem. Para obter mais informações, confira Reutilizar pontos de extremidade de fluxo de dados.
Tópicos do MQTT
Quando a fonte for um ponto de extremidade MQTT (Grade de Eventos incluída), você pode usar o filtro de tópico MQTT para assinar mensagens de entrada. O filtro de tópico pode incluir curingas para assinar vários tópicos. Por exemplo, thermostats/+/telemetry/temperature/#
assina todas as mensagens de telemetria de temperatura dos termostatos. Para configurar os filtros de tópico do MQTT:
Nos Detalhes da fonte de fluxo de dados da experiência de operações, selecione MQTT e, em seguida, use o campo Tópico MQTT para especificar o filtro de tópico MQTT a ser assinado para as mensagens de entrada.
Observação
Somente um filtro de tópico MQTT pode ser especificado na experiência de operações. Para usar vários filtros de tópico MQTT, use o Bicep ou o Kubernetes.
Assinaturas compartilhadas
Para usar assinaturas compartilhadas com fontes MQTT, você pode especificar o tópico de assinatura compartilhada no formato de $shared/<GROUP_NAME>/<TOPIC_FILTER>
.
Em Detalhes da fonte do fluxo de dados da experiência de operações, selecione MQTT e use o campo Tópico MQTT para especificar o grupo de assinatura compartilhada e o tópico.
Se a contagem de instâncias no perfil do fluxo de dados for maior que 1, a assinatura compartilhada é habilitada automaticamente para todos os fluxos de dados que usam a origem MQTT. Neste caso, o prefixo $shared
é adicionado e o nome do grupo de assinatura compartilhada gerado automaticamente. Por exemplo, se você tiver um perfil de fluxo de dados com uma contagem de instâncias de 3 e seu fluxo de dados usar um ponto de extremidade MQTT como origem configurado com os tópicos topic1
e topic2
, eles são convertidos automaticamente em assinaturas compartilhadas como $shared/<GENERATED_GROUP_NAME>/topic1
e $shared/<GENERATED_GROUP_NAME>/topic2
. Se você quiser usar uma ID de grupo de assinatura compartilhada diferente, pode substituí-lo no tópico, como $shared/mygroup/topic1
.
Importante
Os fluxos de dados que requerem assinatura compartilhada quando a contagem de instâncias é maior que 1 são importantes ao usar o agente MQTT da Grade de Eventos como origem, pois não suportam assinaturas compartilhadas. Para evitar mensagens ausentes, defina a contagem de instâncias do perfil do fluxo de dados como 1 ao usar o agente MQTT da Grade de Eventos como origem. Isso ocorre quando o fluxo de dados é o assinante e recebe mensagens da nuvem.
Tópico do Kafka
Quando a fonte for um ponto de extremidade Kafka (Hubs de Eventos incluídos), especifique os tópicos kafka individuais aos quais assinar mensagens de entrada. Não há suporte para curingas, portanto, você deve especificar cada tópico estaticamente.
Observação
Ao usar os Hubs de Eventos por meio do ponto de extremidade do Kafka, cada hub de eventos individual dentro do namespace é o tópico do Kafka. Por exemplo, se você tiver um namespace dos Hubs de Eventos com dois hubs de eventos, thermostats
e humidifiers
, poderá especificar cada hub de eventos como um tópico do Kafka.
Para configurar os tópicos do Kafka:
No momento, não há suporte para o uso de um ponto de extremidade do Kafka como uma fonte na experiência de operações.
Especificar esquema para desserializar dados
Se os dados de origem tiverem campos opcionais ou campos com tipos diferentes, especifique um esquema de desserialização para garantir a consistência. Por exemplo, os dados podem ter campos que não estão presentes em todas as mensagens. Sem o esquema, a transformação não pode manipular esses campos, pois eles teriam valores vazios. Com o esquema, você pode especificar valores padrão ou ignorar os campos.
Especificar o esquema só é relevante ao usar a fonte MQTT ou Kafka. Se a fonte for um ativo, o esquema será automaticamente inferido da definição de ativo.
Para configurar o esquema usado para desserializar as mensagens de entrada de uma fonte:
Nos Detalhes da fonte de fluxo de dados da experiência de operações, selecione MQTT e use o campo Esquema de mensagem para especificar o esquema. Você pode usar o botão Carregar para carregar um arquivo de esquema primeiro. Para saber mais, confira Entenda os esquemas de mensagens.
Transformação
A operação de transformação é o local em que você pode transformar os dados da fonte antes de enviá-los para o destino. As transformações são opcionais. Se você não precisar fazer alterações nos dados, não inclua a operação de transformação na configuração do fluxo de dados. Várias transformações são encadeadas em fases, independentemente da ordem em que são especificadas na configuração. A ordem dos estágios é sempre:
- Enriquecer, Renomear ou adicionar uma Nova propriedade: adicione dados adicionais aos dados de origem, considerando um conjunto de dados e uma condição correspondentes.
- Filtro: Filtra os dados com base em uma condição.
- Mapear ou Calcular: mova dados de um campo para outro com uma conversão opcional.
Na experiência de operações, selecione Fluxo de dados>Add transformação (opcional).
Enriquecer: adicionar dados de referência
Para enriquecer os dados, você pode usar o conjunto de dados de referência no DSS (repositório de estado distribuído) das Operações do Azure IoT. O conjunto de dados é usado para adicionar dados extras aos dados de origem com base em uma condição. A condição é especificada como um campo nos dados de origem que corresponde a um campo no conjunto de dados.
Você pode carregar dados de exemplo no DSS usando a amostra de ferramenta de conjunto do DSS. Os nomes de chave no repositório de estado distribuído correspondem a um conjunto de dados na configuração do fluxo de dados.
Na experiência de operações, a etapa Enriquecer atualmente tem suporte usando as transformações Renomear e Nova propriedade.
Na experiência de operações, selecione um fluxo de dados e, em seguida, Adicionar transformação (opcional).
Escolha as transformações Renomear ou Nova propriedade e, em seguida, selecione Adicionar.
Se o conjunto de dados tiver um registro com o campo asset
, semelhante a:
{
"asset": "thermostat1",
"location": "room1",
"manufacturer": "Contoso"
}
Os dados da fonte com o campo deviceId
correspondente a thermostat1
têm os campos location
e manufacturer
disponíveis nos estágios de filtro e mapa.
Para obter mais informações sobre a sintaxe de condição, confira Enriquecer dados usando fluxos de dados e Converter dados usando fluxos de dados.
Filtro: filtrar dados com base em uma condição
Para filtrar os dados em uma condição, você pode usar o estágio filter
. A condição é especificada como um campo nos dados de origem que corresponde a um valor.
Em Transformar (opcional), selecione Filtrar>Adicionar.
Escolha os pontos de dados a serem incluídos no conjunto de dados.
Adicione uma condição de filtro e uma descrição.
Escolha Aplicar.
Por exemplo, você pode usar uma condição de filtro como temperature > 20
para filtrar dados menores ou iguais a 20 com base no campo de temperatura.
Mapeamento: mover dados de um campo para outro
Para mapear os dados para outro campo com conversão opcional, você pode usar a operação map
. A conversão é especificada como uma fórmula que usa os campos nos dados de origem.
Na experiência de operações, atualmente há suporte para mapeamento usando transformações de computação.
Em Transformar (opcional), selecione Calcular >Adicionar.
Insira os campos e expressões obrigatórios.
Escolha Aplicar.
Para saber mais, confira Mapear dados usando fluxos de dados e Converter dados usando fluxos de dados.
Serializar dados de acordo com um esquema
Se você quiser serializar os dados antes de enviá-los ao destino, precisará especificar um esquema e um formato de serialização. Caso contrário, os dados são serializados em JSON com os tipos inferidos. Os pontos de extremidade de armazenamento, como o Microsoft Fabric ou o Azure Data Lake, exigem um esquema para garantir a consistência dos dados. Os formatos de serialização com suporte são o Parquet e o Delta.
Atualmente, não há suporte para a especificação do esquema de saída e da serialização na experiência de operações.
Para obter mais informações sobre o registro de esquemas, veja Entender esquemas de mensagens.
Destino
Para configurar um destino para o fluxo de dados, especifique a referência do ponto de extremidade e o destino dos dados. Você pode especificar uma lista de destinos de dados para o ponto de extremidade.
Para enviar dados a um destino que não seja o agente MQTT local, crie um ponto de extremidade de fluxo de dados. Para saber como, confira Configurar pontos de extremidade de fluxo de dados. Se o destino não for o agente MQTT local, ele deve ser usado como origem. Para saber mais sobre, consulte Os fluxos de dados devem usar o ponto de extremidade do agente MQTT local.
Importante
Os pontos de extremidade de armazenamento exigem uma referência de esquema. Se você criou pontos de extremidade de destino de armazenamento para o Microsoft Fabric OneLake, ADLS Gen 2, Azure Data Explorer e Armazenamento Local, especifique a referência do esquema.
Selecione o ponto de extremidade do fluxo de dados a ser usado como destino.
Selecione Continuar para configurar o destino.
Insira as configurações necessárias para o destino, incluindo o tópico ou a tabela para o qual enviar os dados. Confira Configurar o destino dos dados (tópico, contêiner ou tabela) para obter mais informações.
Configurar o destino dos dados (tópico, contêiner ou tabela)
Semelhante às fontes de dados, o destino dos dados é um conceito usado para manter os pontos de extremidade do fluxo de dados reutilizáveis em vários fluxos de dados. Essencialmente, ele representa o subdiretório na configuração do ponto de extremidade do fluxo de dados. Por exemplo, se o ponto de extremidade do fluxo de dados for um ponto de extremidade de armazenamento, o destino de dados será a tabela na conta de armazenamento. Se o ponto de extremidade do fluxo de dados for um ponto de extremidade do Kafka, o destino dos dados será o tópico do Kafka.
Tipo de ponto de extremidade | Significado do destino de dados | Descrição |
---|---|---|
MQTT (ou Grade de Eventos) | Tópico | O tópico MQTT para onde os dados são enviados. Há suporte apenas para tópicos estáticos, sem curingas. |
Kafka (ou Hubs de Eventos) | Tópico | O tópico do Kafka para onde os dados são enviados. Há suporte apenas para tópicos estáticos, sem curingas. Se o ponto de extremidade for um namespace dos Hubs de Eventos, o destino dos dados será o hub de eventos individual dentro do namespace. |
Armazenamento do Azure Data Lake | Contêiner | O contêiner na conta de armazenamento. Não é a tabela. |
Microsoft Fabric OneLake | Arquivo ou Pasta | Corresponde ao tipo de caminho configurado para o ponto de extremidade. |
Azure Data Explorer | Tabela | A tabela no banco de dados do Azure Data Explorer. |
Armazenamento local | Pasta | O nome da pasta ou do diretório na montagem do volume persistente do armazenamento local. Ao usar o Armazenamento de contêineres do Azure habilitado pelo Azure Arc Cloud Ingest Edge Volumes, isso deve corresponder ao parâmetro spec.path para o subvolume que você criou. |
Para configurar o destino de dados:
Ao usar a experiência de operações, o campo de destino dos dados é interpretado automaticamente com base no tipo de ponto de extremidade. Por exemplo, se o ponto de extremidade do fluxo de dados for um ponto de extremidade de armazenamento, a página de detalhes do destino solicitará que você insira o nome do contêiner. Se o ponto de extremidade do fluxo de dados for um ponto de extremidade MQTT, a página de detalhes do destino solicitará que você insira o tópico e assim por diante.
Exemplo
O exemplo a seguir é uma configuração de fluxo de dados que usa o ponto de extremidade MQTT para a origem e o destino. A origem filtra os dados do tópico MQTT azure-iot-operations/data/thermostat
. A transformação converte a temperatura para Fahrenheit e filtra os dados onde a temperatura multiplicada pela umidade é menor que 100000. O destino envia os dados para o tópico MQTT factory
.
Veja as guias Bicep ou Kubernetes para o exemplo de configuração.
Para ver mais exemplos de configurações de fluxo de dados, consulte API REST do Azure - Fluxo de Dados e o Bicep de início rápido.
Verifique se um fluxo de dados está funcionando
Siga Tutorial: Ponte MQTT bidirecional para o Grade de Eventos do Azure para verificar se o fluxo de dados está funcionando.
Exportar configuração de fluxo de dados
Para exportar a configuração do fluxo de dados, você poderá usar a experiência de operações ou exportar o recurso personalizado do fluxo de dados.
Selecione o fluxo de dados que deseja exportar e selecione Exportar na barra de ferramentas.
Configuração adequada do fluxo de dados
Para garantir que o fluxo de dados esteja funcionando como esperado, verifique o seguinte:
- O ponto de extremidade padrão do fluxo de dados MQTT deve ser usado como origem ou destino.
- O perfil do fluxo de dados existe e é referenciado na configuração do fluxo de dados.
- A origem é um ponto de extremidade MQTT, ponto de extremidade Kafka ou um ativo. Os pontos de extremidade do tipo armazenamento não podem ser usados como origem.
- Ao usar a Grade de Eventos como origem, a contagem de instâncias do perfil do fluxo de dados é definida como 1 porque o agente MQTT da Grade de Eventos não suporta assinaturas compartilhadas.
- Ao usar os Hubs de Eventos como origem, cada hub de eventos no namespace é um tópico Kafka separado e deve ser especificado como a fonte de dados.
- A transformação, se usada, é configurada com a sintaxe adequada, incluindo o escape de caracteres especiais adequado.
- Ao usar pontos de extremidade do tipo armazenamento como destino, um esquema é especificado.