Configurar fluxos de dados nas Operações do Azure IoT
Importante
Esta página inclui instruções para gerenciar componentes do Azure IoT Operations usando manifestos de implantação do Kubernetes, que está em visualização. Esse recurso é fornecido com várias limitações e não deve ser usado para cargas de trabalho de produção.
Veja Termos de Utilização Complementares da Pré-visualizações do Microsoft Azure para obter os termos legais que se aplicam às funcionalidades do Azure que estão na versão beta, na pré-visualização ou que ainda não foram lançadas para disponibilidade geral.
Um fluxo de dados é o caminho que os dados percorrem da origem para o destino com transformações opcionais. Você pode configurar o fluxo de dados criando um recurso personalizado de fluxo de dados ou usando o portal do Azure IoT Operations Studio. Um fluxo de dados é composto por três partes: a origem, a transformação e o destino.
Para definir a origem e o destino, você precisa configurar os pontos de extremidade de fluxo de dados. A transformação é opcional e pode incluir operações como enriquecer os dados, filtrar os dados e mapeá-los para outro campo.
Importante
Cada fluxo de dados deve ter o ponto de extremidade padrão do agente MQTT local do Azure IoT Operations como origem ou destino.
Você pode usar a experiência de operações no Azure IoT Operations para criar um fluxo de dados. A experiência de operações fornece uma interface visual para configurar o fluxo de dados. Você também pode usar o Bicep para criar um fluxo de dados usando um arquivo de modelo Bicep ou usar o Kubernetes para criar um fluxo de dados usando um arquivo YAML.
Continue lendo para saber como configurar a origem, a transformação e o destino.
Pré-requisitos
Você pode implantar fluxos de dados assim que tiver uma instância do Azure IoT Operations usando o perfil de fluxo de dados padrão e o ponto de extremidade. No entanto, convém configurar perfis de fluxo de dados e pontos de extremidade para personalizar o fluxo de dados.
Perfil de fluxo de dados
Se você não precisar de configurações de dimensionamento diferentes para seus fluxos de dados, use o perfil de fluxo de dados padrão fornecido pelas Operações do Azure IoT. Para saber como configurar um perfil de fluxo de dados, consulte Configurar perfis de fluxo de dados.
Pontos de extremidade de fluxo de dados
Os pontos de extremidade de fluxo de dados são necessários para configurar a origem e o destino do fluxo de dados. Para começar rapidamente, você pode usar o ponto de extremidade de fluxo de dados padrão para o broker MQTT local. Você também pode criar outros tipos de pontos de extremidade de fluxo de dados, como Kafka, Hubs de Eventos ou Armazenamento do Azure Data Lake. Para saber como configurar cada tipo de ponto de extremidade de fluxo de dados, consulte Configurar pontos de extremidade de fluxo de dados.
Começar agora
Depois de ter os pré-requisitos, você pode começar a criar um fluxo de dados.
Para criar um fluxo de dados na experiência de operações, selecione Fluxo de>dados Criar fluxo de dados. Em seguida, você verá a página onde pode configurar a origem, a transformação e o destino para o fluxo de dados.
Analise as seções a seguir para saber como configurar os tipos de operação do fluxo de dados.
Origem
Para configurar uma fonte para o fluxo de dados, especifique a referência de ponto de extremidade e uma lista de fontes de dados para o ponto de extremidade. Escolha uma das seguintes opções como a fonte para o fluxo de dados.
Se o ponto de extremidade padrão não for usado como origem, ele deverá ser usado como destino. Para saber mais sobre, consulte Os fluxos de dados devem usar o ponto de extremidade do broker MQTT local.
Opção 1: Usar o ponto de extremidade MQTT padrão como origem
Em Detalhes do código-fonte, selecione MQTT.
Insira as seguintes configurações para a origem MQTT:
Definição Descrição Tópico MQTT O filtro de tópico MQTT para se inscrever para mensagens recebidas. Consulte Configurar tópicos MQTT ou Kafka. Esquema de mensagem O esquema a ser usado para desserializar as mensagens de entrada. Consulte Especificar esquema para desserializar dados. Selecione Aplicar.
Opção 2: Usar ativo como fonte
Você pode usar um ativo como fonte para o fluxo de dados. O uso de um ativo como fonte só está disponível na experiência de operações.
Em Detalhes da fonte, selecione Ativo.
Selecione o ativo que você deseja usar como o ponto de extremidade de origem.
Selecione Continuar.
Uma lista de pontos de dados para o ativo selecionado é exibida.
Selecione Aplicar para usar o ativo como o ponto de extremidade de origem.
Ao usar um ativo como origem, a definição de ativo é usada para inferir o esquema para o fluxo de dados. A definição de ativo inclui o esquema para os pontos de dados do ativo. Para saber mais, consulte Gerenciar configurações de ativos remotamente.
Uma vez configurados, os dados do ativo chegaram ao fluxo de dados através do broker MQTT local. Assim, ao usar um ativo como fonte, o fluxo de dados usa o ponto de extremidade padrão do broker MQTT local como a fonte na realidade.
Opção 3: Usar o ponto de extremidade de fluxo de dados MQTT ou Kafka personalizado como origem
Se você criou um ponto de extremidade de fluxo de dados MQTT ou Kafka personalizado (por exemplo, para usar com Grade de Eventos ou Hubs de Eventos), poderá usá-lo como a fonte para o fluxo de dados. Lembre-se de que os pontos de extremidade de tipo de armazenamento, como Data Lake ou Fabric OneLake, não podem ser usados como origem.
Para configurar, use Kubernetes YAML ou Bicep. Substitua valores de espaço reservado pelo nome e tópicos personalizados do ponto de extremidade.
O uso de um ponto de extremidade MQTT ou Kafka personalizado como fonte não é suportado atualmente na experiência de operações.
Configurar fontes de dados (tópicos MQTT ou Kafka)
Você pode especificar vários tópicos MQTT ou Kafka em uma fonte sem precisar modificar a configuração do ponto de extremidade do fluxo de dados. Essa flexibilidade significa que o mesmo ponto de extremidade pode ser reutilizado em vários fluxos de dados, mesmo que os tópicos variem. Para obter mais informações, consulte Reutilizar pontos de extremidade de fluxo de dados.
Tópicos MQTT
Quando a origem é um ponto de extremidade MQTT (Event Grid included), você pode usar o filtro de tópico MQTT para assinar mensagens de entrada. O filtro de tópicos pode incluir curingas para assinar vários tópicos. Por exemplo, thermostats/+/telemetry/temperature/#
subscreve todas as mensagens de telemetria de temperatura dos termostatos. Para configurar os filtros de tópico MQTT:
Nos detalhes da fonte do fluxo de dados da experiência de operações, selecione MQTT e use o campo de tópico MQTT para especificar o filtro de tópico MQTT para se inscrever para mensagens de entrada.
Nota
Apenas um filtro de tópico MQTT pode ser especificado na experiência de operações. Para usar vários filtros de tópico MQTT, use Bicep ou Kubernetes.
Subscrições partilhadas
Para usar assinaturas compartilhadas com fontes MQTT, você pode especificar o tópico de assinatura compartilhada na forma de $shared/<GROUP_NAME>/<TOPIC_FILTER>
.
Em Detalhes da fonte do fluxo de dados da experiência de operações, selecione MQTT e use o campo de tópico MQTT para especificar o grupo e o tópico de assinatura compartilhada.
Se a contagem de instâncias no perfil de fluxo de dados for maior que uma, a assinatura compartilhada será automaticamente habilitada para todos os fluxos de dados que usam a origem MQTT. Nesse caso, o prefixo $shared
é adicionado e o nome do grupo de assinatura compartilhada gerado automaticamente. Por exemplo, se você tiver um perfil de fluxo de dados com uma contagem de instâncias de 3 e seu fluxo de dados usar um ponto de extremidade MQTT como fonte configurado com tópicos topic1
e topic2
, eles serão automaticamente convertidos em assinaturas compartilhadas como $shared/<GENERATED_GROUP_NAME>/topic1
e $shared/<GENERATED_GROUP_NAME>/topic2
.
Você pode criar explicitamente um tópico nomeado $shared/mygroup/topic
em sua configuração. No entanto, adicionar o $shared
tópico explicitamente não é recomendado, pois o prefixo $shared
é adicionado automaticamente quando necessário. Os fluxos de dados podem fazer otimizações com o nome do grupo se ele não estiver definido. Por exemplo, $share
não está definido e os fluxos de dados só precisam operar sobre o nome do tópico.
Importante
Os fluxos de dados que exigem assinatura compartilhada quando a contagem de instâncias é maior que uma são importantes ao usar o agente MQTT da Grade de Eventos como origem, pois ele não oferece suporte a assinaturas compartilhadas. Para evitar mensagens ausentes, defina a contagem de instâncias do perfil de fluxo de dados como uma ao usar o agente MQTT da Grade de Eventos como origem. É quando o fluxo de dados é o assinante e o recebimento de mensagens da nuvem.
Tópicos de Kafka
Quando a origem for um ponto de extremidade Kafka (Hubs de Eventos incluídos), especifique os tópicos Kafka individuais para assinar para receber mensagens. Não há suporte para curingas, portanto, você deve especificar cada tópico estaticamente.
Nota
Ao usar Hubs de Eventos por meio do ponto de extremidade Kafka, cada hub de evento individual dentro do namespace é o tópico Kafka. Por exemplo, se você tiver um namespace Hubs de Eventos com dois hubs thermostats
de eventos e humidifiers
, poderá especificar cada hub de eventos como um tópico Kafka.
Para configurar os tópicos de Kafka:
O uso de um ponto de extremidade Kafka como fonte não é suportado atualmente na experiência de operações.
Especificar esquema de origem
Ao usar MQTT ou Kafka como origem, você pode especificar um esquema para exibir a lista de pontos de dados no portal de experiência de operações. Observe que o uso de um esquema para desserializar e validar mensagens de entrada não é suportado no momento.
Se a origem for um ativo, o esquema será automaticamente inferido a partir da definição de ativo.
Gorjeta
Para gerar o esquema a partir de um arquivo de dados de exemplo, use o Schema Gen Helper.
Para configurar o esquema usado para desserializar as mensagens de entrada de uma fonte:
Em Detalhes da fonte do fluxo de dados da experiência de operações, selecione MQTT e use o campo Esquema de mensagem para especificar o esquema. Você pode usar o botão Carregar para carregar um arquivo de esquema primeiro. Para saber mais, consulte Compreender esquemas de mensagens.
Para saber mais, consulte Compreender esquemas de mensagens.
Transformação
A operação de transformação é onde você pode transformar os dados da origem antes de enviá-los para o destino. As transformações são opcionais. Se você não precisar fazer alterações nos dados, não inclua a operação de transformação na configuração do fluxo de dados. Várias transformações são encadeadas em estágios, independentemente da ordem em que são especificadas na configuração. A ordem das etapas é sempre:
- Enriquecimento: adicione dados adicionais aos dados de origem com um conjunto de dados e uma condição correspondentes.
- Filtrar: filtre os dados com base em uma condição.
- Mapear, Calcular, Renomear ou adicionar uma propriedade Novo: mova dados de um campo para outro com uma conversão opcional.
Esta seção é uma introdução às transformações de fluxo de dados. Para obter informações mais detalhadas, consulte Mapear dados usando fluxos de dados, Converter dados usando conversões de fluxo de dados e Enriquecer dados usando fluxos de dados.
Na experiência de operações, selecione Dataflow>Add transform (opcional).
Enriquecimento: Adicionar dados de referência
Para enriquecer os dados, primeiro adicione o conjunto de dados de referência no repositório de estado do Azure IoT Operations. O conjunto de dados é usado para adicionar dados extras aos dados de origem com base em uma condição. A condição é especificada como um campo nos dados de origem que corresponde a um campo no conjunto de dados.
Você pode carregar dados de exemplo no repositório de estado usando a CLI do repositório de estado. Os nomes de chave no armazenamento de estado correspondem a um conjunto de dados na configuração de fluxo de dados.
Atualmente, o estágio Enrich não é suportado na experiência de operações.
Se o conjunto de dados tiver um registro com o asset
campo, semelhante a:
{
"asset": "thermostat1",
"location": "room1",
"manufacturer": "Contoso"
}
Os dados da fonte com a deviceId
correspondência thermostat1
de campo têm os location
campos e manufacturer
disponíveis em etapas de filtro e mapa.
Para obter mais informações sobre sintaxe de condição, consulte Enriquecer dados usando fluxos de dados e Converter dados usando fluxos de dados.
Filtrar: filtrar dados com base em uma condição
Para filtrar os dados em uma condição, você pode usar o filter
palco. A condição é especificada como um campo nos dados de origem que corresponde a um valor.
Em Transformar (opcional), selecione Adicionar filtro>.
Insira as configurações necessárias.
Definição Descrição Condição de filtro A condição para filtrar os dados com base em um campo nos dados de origem. Description Forneça uma descrição para a condição do filtro. No campo de condição do filtro, insira
@
ou selecione Ctrl + Espaço para escolher pontos de dados em uma lista suspensa.Você pode inserir propriedades de metadados MQTT usando o formato
@$metadata.user_properties.<property>
ou@$metadata.topic
. Você também pode inserir cabeçalhos $metadata usando o formato@$metadata.<header>
. A$metadata
sintaxe só é necessária para propriedades MQTT que fazem parte do cabeçalho da mensagem. Para obter mais informações, consulte referências de campo.A condição pode usar os campos nos dados de origem. Por exemplo, você pode usar uma condição de filtro como
@temperature > 20
filtrar dados menores ou iguais a 20 com base no campo de temperatura.Selecione Aplicar.
Mapa: Mover dados de um campo para outro
Para mapear os dados para outro campo com conversão opcional, você pode usar a map
operação. A conversão é especificada como uma fórmula que usa os campos nos dados de origem.
Na experiência de operações, o mapeamento é atualmente suportado usando as transformações de propriedade Compute, Rename e New.
Computação
Você pode usar a transformação Compute para aplicar uma fórmula aos dados de origem. Esta operação é usada para aplicar uma fórmula aos dados de origem e armazenar o campo de resultado.
Em Transformar (opcional), selecione Adicionar computação>.
Insira as configurações necessárias.
Definição Descrição Selecionar fórmula Escolha uma fórmula existente na lista pendente ou selecione Personalizar para introduzir uma fórmula manualmente. Saída Especifique o nome de exibição da saída para o resultado. Fórmula Insira a fórmula a ser aplicada aos dados de origem. Description Forneça uma descrição para a transformação. Último valor conhecido Opcionalmente, use o último valor conhecido se o valor atual não estiver disponível. Pode introduzir ou editar uma fórmula no campo Fórmula . A fórmula pode usar os campos nos dados de origem. Insira
@
ou selecione Ctrl + Espaço para escolher pontos de dados em uma lista suspensa.Você pode inserir propriedades de metadados MQTT usando o formato
@$metadata.user_properties.<property>
ou@$metadata.topic
. Você também pode inserir cabeçalhos $metadata usando o formato@$metadata.<header>
. A$metadata
sintaxe só é necessária para propriedades MQTT que fazem parte do cabeçalho da mensagem. Para obter mais informações, consulte referências de campo.A fórmula pode usar os campos nos dados de origem. Por exemplo, você pode usar o
temperature
campo nos dados de origem para converter a temperatura em Celsius e armazená-la notemperatureCelsius
campo de saída.Selecione Aplicar.
Mudar o nome
Você pode renomear um ponto de dados usando a transformação Renomear . Esta operação é usada para renomear um ponto de dados nos dados de origem para um novo nome. O novo nome pode ser usado nos estágios subsequentes do fluxo de dados.
Em Transformar (opcional), selecione Renomear>Adicionar.
Insira as configurações necessárias.
Definição Descrição Ponto de dados Selecione um ponto de dados na lista suspensa ou insira um cabeçalho $metadata. Novo nome de ponto de dados Insira o novo nome para o ponto de dados. Description Forneça uma descrição para a transformação. Insira
@
ou selecione Ctrl + Espaço para escolher pontos de dados em uma lista suspensa.Você pode inserir propriedades de metadados MQTT usando o formato
@$metadata.user_properties.<property>
ou@$metadata.topic
. Você também pode inserir cabeçalhos $metadata usando o formato@$metadata.<header>
. A$metadata
sintaxe só é necessária para propriedades MQTT que fazem parte do cabeçalho da mensagem. Para obter mais informações, consulte referências de campo.Selecione Aplicar.
Nova propriedade
Você pode adicionar uma nova propriedade aos dados de origem usando a transformação de propriedade New. Esta operação é usada para adicionar uma nova propriedade aos dados de origem. A nova propriedade pode ser usada nos estágios subsequentes do fluxo de dados.
Em Transformar (opcional), selecione Nova propriedade>Adicionar.
Insira as configurações necessárias.
Definição Descrição Chave da propriedade Insira a chave da nova propriedade. Valor do imóvel Insira o valor da nova propriedade. Description Forneça uma descrição para a nova propriedade. Selecione Aplicar.
Para saber mais, consulte Mapear dados usando fluxos de dados e Converter dados usando fluxos de dados.
Serializar dados de acordo com um esquema
Se desejar serializar os dados antes de enviá-los para o destino, será necessário especificar um esquema e um formato de serialização. Caso contrário, os dados são serializados em JSON com os tipos inferidos. Pontos de extremidade de armazenamento como o Microsoft Fabric ou o Azure Data Lake exigem um esquema para garantir a consistência dos dados. Os formatos de serialização suportados são Parquet e Delta.
Gorjeta
Para gerar o esquema a partir de um arquivo de dados de exemplo, use o Schema Gen Helper.
Para experiência em operações, especifique o esquema e o formato de serialização nos detalhes do ponto de extremidade do fluxo de dados. Os pontos de extremidade que oferecem suporte a formatos de serialização são Microsoft Fabric OneLake, Azure Data Lake Storage Gen 2 e Azure Data Explorer. Por exemplo, para serializar os dados no formato Delta, você precisa carregar um esquema no registro do esquema e fazer referência a ele na configuração do ponto de extremidade de destino do fluxo de dados.
Para obter mais informações sobre o registro de esquema, consulte Compreender esquemas de mensagem.
Destino
Para configurar um destino para o fluxo de dados, especifique a referência do ponto de extremidade e o destino dos dados. Você pode especificar uma lista de destinos de dados para o ponto de extremidade.
Para enviar dados para um destino diferente do broker MQTT local, crie um ponto de extremidade de fluxo de dados. Para saber como, consulte Configurar pontos de extremidade de fluxo de dados. Se o destino não for o broker MQTT local, ele deverá ser usado como origem. Para saber mais sobre, consulte Os fluxos de dados devem usar o ponto de extremidade do broker MQTT local.
Importante
Os pontos de extremidade de armazenamento exigem um esquema para serialização. Para usar o fluxo de dados com o Microsoft Fabric OneLake, o Azure Data Lake Storage, o Azure Data Explorer ou o Armazenamento Local, você deve especificar uma referência de esquema.
Selecione o ponto de extremidade de fluxo de dados a ser usado como destino.
Os pontos de extremidade de armazenamento exigem um esquema para serialização. Se você escolher um Microsoft Fabric OneLake, Azure Data Lake Storage, Azure Data Explorer ou ponto de extremidade de destino do Armazenamento Local, deverá especificar uma referência de esquema. Por exemplo, para serializar os dados para um ponto de extremidade do Microsoft Fabric no formato Delta, você precisa carregar um esquema no registro do esquema e fazer referência a ele na configuração do ponto de extremidade de destino do fluxo de dados.
Selecione Continuar para configurar o destino.
Insira as configurações necessárias para o destino, incluindo o tópico ou a tabela para a qual enviar os dados. Consulte Configurar destino de dados (tópico, contêiner ou tabela) para obter mais informações.
Configurar destino de dados (tópico, contêiner ou tabela)
Semelhante às fontes de dados, o destino de dados é um conceito usado para manter os pontos de extremidade de fluxo de dados reutilizáveis em vários fluxos de dados. Essencialmente, ele representa o subdiretório na configuração do ponto de extremidade do fluxo de dados. Por exemplo, se o ponto de extremidade do fluxo de dados for um ponto de extremidade de armazenamento, o destino dos dados será a tabela na conta de armazenamento. Se o ponto de extremidade do fluxo de dados for um ponto de extremidade Kafka, o destino dos dados será o tópico Kafka.
Tipo de ponto final | Significado do destino dos dados | Description |
---|---|---|
MQTT (ou Grade de Eventos) | Tópico | O tópico MQTT para onde os dados são enviados. Apenas tópicos estáticos são suportados, sem curingas. |
Kafka (ou Hubs de Eventos) | Tópico | O tópico Kafka para onde os dados são enviados. Apenas tópicos estáticos são suportados, sem curingas. Se o ponto de extremidade for um namespace de Hubs de Eventos, o destino de dados será o hub de eventos individual dentro do namespace. |
Azure Data Lake Storage | Contentor | O contêiner na conta de armazenamento. Não a tabela. |
Microsoft Fabric OneLake | Tabela ou pasta | Corresponde ao tipo de caminho configurado para o ponto de extremidade. |
Azure Data Explorer | Tabela | A tabela no banco de dados do Azure Data Explorer. |
Armazenamento Local | Pasta | O nome da pasta ou diretório na montagem de volume persistente de armazenamento local. Ao usar o Armazenamento de Contêiner do Azure habilitado pelo Azure Arc Cloud Ingest Edge Volumes, isso deve corresponder ao spec.path parâmetro para o subvolume que você criou. |
Para configurar o destino dos dados:
Ao usar a experiência de operações, o campo de destino de dados é interpretado automaticamente com base no tipo de ponto final. Por exemplo, se o ponto de extremidade do fluxo de dados for um ponto de extremidade de armazenamento, a página de detalhes do destino solicitará que você insira o nome do contêiner. Se o ponto de extremidade do fluxo de dados for um ponto de extremidade MQTT, a página de detalhes do destino solicitará que você insira o tópico e assim por diante.
Exemplo
O exemplo a seguir é uma configuração de fluxo de dados que usa o ponto de extremidade MQTT para a origem e o destino. A fonte filtra os dados do tópico azure-iot-operations/data/thermostat
MQTT. A transformação converte a temperatura em Fahrenheit e filtra os dados onde a temperatura multiplicada pela humidade é inferior a 100000. O destino envia os dados para o tópico factory
MQTT .
Consulte as guias Bicep ou Kubernetes para obter o exemplo de configuração.
Para ver mais exemplos de configurações de fluxo de dados, consulte API REST do Azure - Fluxo de dados e o Bicep de início rápido.
Verificar se um fluxo de dados está funcionando
Siga o Tutorial: Ponte MQTT bidirecional para a Grade de Eventos do Azure para verificar se o fluxo de dados está funcionando.
Exportar configuração de fluxo de dados
Para exportar a configuração de fluxo de dados, você pode usar a experiência de operações ou exportando o recurso personalizado de fluxo de dados.
Selecione o fluxo de dados que deseja exportar e selecione Exportar na barra de ferramentas.
Configuração adequada do fluxo de dados
Para garantir que o fluxo de dados esteja funcionando conforme o esperado, verifique o seguinte:
- O ponto de extremidade de fluxo de dados MQTT padrão deve ser usado como origem ou destino.
- O perfil de fluxo de dados existe e é referenciado na configuração de fluxo de dados.
- Source é um endpoint MQTT, um endpoint Kafka ou um ativo. Os pontos de extremidade do tipo de armazenamento não podem ser usados como origem.
- Ao usar a Grade de Eventos como origem, a contagem de instâncias do perfil de fluxo de dados é definida como 1 porque o agente MQTT da Grade de Eventos não oferece suporte a assinaturas compartilhadas.
- Ao usar Hubs de Eventos como origem, cada hub de eventos no namespace é um tópico Kafka separado e deve ser especificado como a fonte de dados.
- A transformação, se usada, é configurada com sintaxe adequada, incluindo a fuga adequada de caracteres especiais.
- Ao usar pontos de extremidade de tipo de armazenamento como destino, um esquema é especificado.