Compartilhar via


Transmitir dados como entrada no Stream Analytics

O Stream Analytics tem integração de primeira classe com fluxos de dados do Azure como entradas de quatro tipos de recursos:

Esses recursos de entrada podem residir na mesma assinatura do Azure que o trabalho do Stream Analytics, ou em uma assinatura diferente.

Compactação

O Stream Analytics dá suporte à compactação para todas as fontes de entrada. Os tipos de compactação compatíveis são: Nenhuma, Gzip e Deflate. O suporte à compactação não está disponível para dados de referência. Se os dados de entrada forem dados Avro compactados, o Stream Analytics os tratará de forma transparente. Não é necessário especificar o tipo de compactação com a serialização do Avro.

Criar, editar ou testar entradas

Você pode usar o Portal do Azure, o Visual Studio e o Visual Studio Code para adicionar e exibir ou editar entradas existentes no trabalho de streaming. Além disso, você pode testar conexões de entrada e consultas de teste de dados de exemplo no portal do Azure, no Visual Studio e no Visual Studio Code. Ao gravar uma consulta, você lista a entrada na cláusula FROM. Você pode obter a lista de entradas disponíveis na página Consulta no portal. Se você quiser usar várias entradas, JOIN elas ou grave várias consultas SELECT.

Observação

Recomendamos enfaticamente que você use as ferramentas Stream Analytics para Visual Studio Code para obter a melhor experiência de desenvolvimento local. Há lacunas de recursos conhecidas nas ferramentas do Stream Analytics para Visual Studio 2019 (versão 2.6.3000.0), e isso não será aprimorado no futuro.

Transmitir dados dos Hubs de Eventos

Os Hubs de Eventos do Azure é um ingestor de eventos de publicação e assinatura altamente escalonável. Um Hub de Eventos pode incluir milhões de eventos por segundo, para que você possa processar e analisar grandes quantidades de dados produzidos por seus aplicativos e dispositivos conectados. Juntos, os Hubs de Eventos e o Stream Analytics podem fornecer uma solução de ponta a ponta para análise em tempo real. Os Hubs de Eventos permitem que você envie eventos para o Azure em tempo real, e os trabalhos do Stream Analytics podem processá-los em tempo real. Por exemplo, você pode enviar cliques da Web, leituras do sensor ou eventos de log online para Hubs de Eventos. Em seguida, você pode criar trabalhos do Stream Analytics para usar os Hubs de Eventos como dados de entrada para filtragem, agregação e correlação em tempo real.

EventEnqueuedUtcTime é o carimbo de data/hora da chegada de um evento em um hub de eventos e é o carimbo de data/hora padrão de eventos provenientes dos Hubs de Eventos para o Stream Analytics. Para processar os dados como uma transmissão usando um carimbo de data/hora na carga do evento, você deve usar a palavra-chave TIMESTAMP BY.

Grupo de consumidores de Hubs de Eventos

Você deve configurar cada entrada do hub de eventos para ter seu próprio grupo de consumidores. Quando um trabalho contém uma autojunção ou tem várias entradas, algumas entradas podem ser lidas por mais de um leitor downstream. Essa situação afeta o número de leitores em um único grupo de consumidores. Para evitar exceder o limite de Hub de Eventos dos cinco leitores por grupo de consumidores por partição, é uma melhor prática designar um grupo de consumidores para cada trabalho do Stream Analytics. Também há um limite de 20 grupos de consumidores por Hub de Eventos de nível padrão. Para obter mais informações, consulte entradas Solucionar problemas do Azure Stream Analytics.

Criar uma entrada dos Hubs de Eventos

A tabela a seguir explica cada propriedade na página Nova entrada no portal do Azure para transmitir entrada de dados de um hub de eventos:

Propriedade Descrição
Alias de entrada Um nome amigável que você usa na consulta do trabalho para fazer referência a essa entrada.
Assinatura Escolha a assinatura do Azure na qual o recurso do Hub de Eventos existe.
Namespace do Hub de Eventos O namespace Hubs de Eventos é um contêiner para o hubs de eventos. Ao criar um hub de eventos, crie também o namespace.
Nome do Hub de Eventos O nome do Hub de Eventos para usar como entrada.
Grupo de consumidores de Hub de Eventos (recomendado) É recomendável usar um grupo de consumidores distinto para cada trabalho do Stream Analytics. Esta cadeia de caracteres identifica o grupo de consumidores a ser usado para ingerir dados do hub de eventos. Se nenhum grupo de consumidores for especificado, o trabalho do Stream Analytics usará o grupo de consumidores $Default.
Modo de autenticação Especifique o tipo de autenticação que deseja usar para se conectar ao hub de eventos. Você pode usar uma cadeia de conexão ou uma identidade gerenciada para se autenticar no hub de eventos. Para a opção de identidade gerenciada, você pode criar uma identidade gerenciada atribuída pelo sistema para o trabalho do Stream Analytics ou uma identidade gerenciada atribuída pelo usuário para autenticação com o hub de eventos. Quando você usa uma identidade gerenciada, a identidade gerenciada deve ser membro das funções Receptor de Dados de Hubs de Eventos do Azure ou Proprietário de Dados de Hubs de Eventos do Azure.
Nome da política do Hub de Eventos A política de acesso compartilhado que fornece acesso ao Hub de Eventos. Cada política de acesso compartilhado tem um nome, as permissões definidas por você e as chaves de acesso. Essa opção é preenchida automaticamente, a menos que você selecione a opção para fornecer as configurações do Hub de Eventos manualmente.
Chave de partição É um campo opcional que está disponível somente se seu trabalho estiver configurado para usar o nível de compatibilidade 1.2 ou superior. Se a entrada for particionada por uma propriedade, você poderá adicionar o nome dessa propriedade. É usado para melhorar o desempenho da sua consulta se ela incluir uma cláusula PARTITION BY ou GROUP BY nessa propriedade. Se esse trabalho usar o nível de compatibilidade 1.2 ou superior, o padrão desse campo será PartitionId.
Formato de serialização do evento O formato de serialização (JSON, CSV, Avro, Parquet ou outros (Protobuf, XML, proprietário...)) do fluxo de dados de entrada. Verifique se o formato JSON está alinhado com a especificação e não inclui um 0 à esquerda para números decimais.
Codificação UTF-8 é o único formato de codificação com suporte no momento.
Tipo de compactação do evento O tipo de compactação usado para ler o fluxo de dados de entrada, como nenhum (padrão), Gzip ou Deflate.
Registro de esquema (versão prévia) É possível selecionar o registro de esquemas com esquemas para os dados de eventos recebidos do hub de eventos.

Quando seus dados vêm de uma entrada de fluxo do Hub de Eventos, você tem acesso aos seguintes campos de metadados em sua consulta do Stream Analytics:

Propriedade Descrição
EventProcessedUtcTime A data e a hora em que o Stream Analytics processa o evento.
EventEnqueuedUtcTime A data e a hora em que o Event Hubs recebe os eventos.
PartitionId A ID de partição com base em zero para o adaptador de entrada.

Por exemplo, usando esses campos, você pode escrever uma consulta como o exemplo a seguir:

SELECT
    EventProcessedUtcTime,
    EventEnqueuedUtcTime,
    PartitionId
FROM Input

Observação

Ao usar os Hubs de Eventos como um ponto de extremidade para Rotas do Hub IoT, você pode acessar os metadados do Hub IoT usando a função GetMetadataPropertyValue.

Transmitir dados do Hub IoT

O Hub IoT do Azure é um ingestor de eventos altamente escalonável de publicação/assinatura e otimizado para cenários de IoT.

O carimbo de data/hora padrão de eventos provenientes de um Hub IoT no Stream Analytics é o carimbo de data/hora de que o evento foi recebido no Hub IoT, que é EventEnqueuedUtcTime. Para processar os dados como uma transmissão usando um carimbo de data/hora na carga do evento, você deve usar a palavra-chave TIMESTAMP BY.

Grupo de consumidores do Hub IoT

Cada entrada do Hub IoT do Stream Analytics deve ser configurada para ter seu próprio grupo de consumidores. Quando um trabalho contém uma autojunção ou ele tem várias entradas, algumas entradas podem ser lidas por mais de um leitor de downstream. Essa situação afeta o número de leitores em um único grupo de consumidores. Para evitar exceder o limite do Hub IoT do Azure dos cinco leitores por grupo de consumidores por partição, é uma melhor prática designar um grupo de consumidores para cada trabalho do Stream Analytics.

Configurar um Hub IoT como uma entrada do fluxo de dados

A tabela a seguir explica cada propriedade na página Nova entrada no portal do Azure, quando você configura um Hub IoT como entrada de fluxo.

Propriedade Descrição
Alias de entrada Um nome amigável que você usa na consulta do trabalho para fazer referência a essa entrada.
Assinatura Escolha a assinatura na qual existem os recursos de Hub IoT existentes.
Hub IoT O nome do Hub IoT para usar como entrada.
Grupo de consumidores É recomendável usar um grupo de consumidores distinto para cada trabalho do Stream Analytics. O grupo de consumidores é usado para ingerir dados do Hub IoT. O Stream Analytics usa o grupo de consumidores $Default, a menos que você especifique o contrário.
Nome da política de acesso compartilhado A política de acesso compartilhado que fornece acesso ao Hub IoT. Cada política de acesso compartilhado tem um nome, as permissões definidas por você e as chaves de acesso.
Chave da política de acesso compartilhado A chave de acesso compartilhado usada para autorizar o acesso ao Hub IoT. Essa opção é preenchida automaticamente, a menos que você selecione a opção de fornecer as configurações do Hub IoT manualmente.
Ponto de extremidade O ponto de extremidade para o Hub IoT.
Chave de partição É um campo opcional que está disponível somente se seu trabalho estiver configurado para usar o nível de compatibilidade 1.2 ou superior. Se a entrada for particionada por uma propriedade, você poderá adicionar o nome dessa propriedade. É usado para melhorar o desempenho da sua consulta se ela incluir uma cláusula PARTITION BY ou GROUP BY nessa propriedade. Se esse trabalho usar o nível de compatibilidade 1.2 ou superior, o padrão desse campo será "PartitionId."
Formato de serialização do evento O formato de serialização (JSON, CSV, Avro, Parquet ou outros (Protobuf, XML, proprietário...)) do fluxo de dados de entrada. Verifique se o formato JSON está alinhado com a especificação e não inclui um 0 à esquerda para números decimais.
Codificação UTF-8 é o único formato de codificação com suporte no momento.
Tipo de compactação do evento O tipo de compactação usado para ler o fluxo de dados de entrada, como nenhum (padrão), Gzip ou Deflate.

Ao usar dados de fluxo provenientes de um Hub IoT, você poderá acessar alguns campos de metadados em sua consulta do Stream Analytics:

Propriedade Descrição
EventProcessedUtcTime A data e a hora em que o evento foi processado.
EventEnqueuedUtcTime A data e a hora em que o Hub IoT recebe o evento.
PartitionId A ID de partição com base em zero para o adaptador de entrada.
IoTHub.MessageId Uma ID usada para correlacionar a comunicação bidirecional no Hub IoT.
IoTHub.CorrelationId Uma ID usada em respostas a mensagens e comentários no Hub IoT.
IoTHub.ConnectionDeviceId A ID de autenticação usada para enviar esta mensagem. Esse valor é registrado em mensagens vinculadas ao serviço pelo Hub IoT.
IoTHub.ConnectionDeviceGenerationId A ID de geração do dispositivo autenticado que foi usado para enviar esta mensagem. Esse valor é marcado em mensagens servicebound pelo Hub IoT.
IoTHub.EnqueuedTime O horário em que o Hub IoT recebe a mensagem.

Transmitir dados do armazenamento de Blobs ou Data Lake Storage Gen2

Para cenários com grandes quantidades de dados não estruturados a serem armazenados na nuvem, o Armazenamento de Blobs do Azure ou o Azure Data Lake Storage Gen2 oferece uma solução econômica e escalonável. Os dados no armazenamento de Blobs ou no Azure Data Lake Storage Gen2 são considerados dados inativos. No entanto, esses dados podem ser processados como um fluxo de dados pelo Azure Stream Analytics.

O processamento de log é um cenário bastante usado para o uso de entradas de armazenamento de Blobs com o Stream Analytics. Nesse cenário, os arquivos de dados de telemetria são capturados de um sistema e precisam ser analisados e processados para extrair dados significativos.

O carimbo de data/hora padrão de um evento do armazenamento de Blobs ou do Azure Data Lake Storage Gen2 no Stream Analytics é o carimbo de data/hora da última modificação, que é BlobLastModifiedUtcTime. Se um blob for carregado em uma conta de armazenamento às 13:00 e o trabalho do Azure Stream Analytics for iniciado usando a opção Agora às 13:01, o blob não será selecionado, pois sua hora modificada fica fora do período de execução do trabalho.

Se um blob for carregado em um contêiner da conta de armazenamento às 13:00 e o trabalho do Azure Stream Analytics for iniciado usando a opção Agora às 13:00 ou antes, o blob não será selecionado, pois sua hora de modificação fica fora do período de execução do trabalho.

Se um trabalho do Azure Stream Analytics for iniciado usando Agora às 13:00, e um blob for carregado no contêiner da conta de armazenamento às 13h01, o Azure Stream Analytics pegará o blob. O carimbo de data/hora atribuído a cada blob é baseado apenas no BlobLastModifiedTime. A pasta em que o blob está não tem nenhuma relação com o carimbo de data/hora atribuído. Por exemplo, se houver um blob 2019/10-01/00/b1.txt com BlobLastModifiedTime de 2019-11-11, então o carimbo de data/hora atribuído a esse blob será 2019-11-11.

Para processar os dados como uma transmissão usando um carimbo de data/hora na carga do evento, você deve usar a palavra-chave TIMESTAMP BY. Um trabalho do Stream Analytics efetua pull do dados do Armazenamento de Blobs do Azure ou da entrada do Azure Data Lake Storage Gen2 a cada segundo, se o arquivo de blob estiver disponível. Se o arquivo de blob não estiver disponível, não há uma retirada exponencial com um atraso de tempo máximo de 90 segundos.

Observação

O Stream Analytics não dá suporte para a adição de conteúdo a um arquivo de blob existente. O Stream Analytics exibirá cada arquivo apenas uma vez e quaisquer alterações que ocorram no arquivo após o trabalho ter lido os dados não serão processados. A prática recomendada é carregar todos os dados para um arquivo de blob de uma vez e, em seguida, adicionar outros eventos mais recentes em um arquivo diferente, o novo arquivo de blob.

Em cenários em que muitos blobs são continuamente adicionados e o Stream Analytics está processando os blobs à medida que são adicionados, é possível que alguns sejam ignorados em raros casos devido à granularidade do BlobLastModifiedTime. Você pode mitigar esse caso carregando blobs com pelo menos dois segundos de diferença. Se essa opção não for viável, use os Hubs de Eventos para transmitir grandes volumes de eventos.

Configurar o Armazenamento de Blobs como uma entrada de dados

A tabela a seguir explica cada propriedade na página Nova entrada no portal do Azure, quando você configura o Armazenamento de Blobs como uma entrada de fluxo.

Propriedade Descrição
Alias de entrada Um nome amigável que você usa na consulta do trabalho para fazer referência a essa entrada.
Assinatura Escolha a assinatura na qual existem os recursos de armazenamento.
Conta de armazenamento O nome da conta de armazenamento em que estão localizados os arquivos de blob.
Chave de conta de armazenamento A chave secreta associada à conta de armazenamento. Essa opção é preenchida automaticamente, a menos que você selecione a opção de fornecer as configurações de forma manual.
Contêiner Contêineres fornecem um agrupamento lógico para blobs. Você pode escolher Usar contêiner existente ou Criar novo para ter um novo contêiner criado.
Modo de autenticação Especifique o tipo de autenticação que deseja usar para se conectar à conta de armazenamento. Você pode usar uma cadeia de conexão ou uma identidade gerenciada para se autenticar na conta de armazenamento. Para a opção de identidade gerenciada, é possível criar uma identidade gerenciada atribuída pelo sistema para o trabalho do Stream Analytics ou uma identidade gerenciada atribuída pelo usuário para autenticação com a conta de armazenamento. Quando você usa uma identidade gerenciada, a identidade gerenciada deve ser membro de uma função apropriada na conta de armazenamento.
Padrão do caminho (opcional) O caminho do arquivo usado para localizar os blobs no contêiner especificado. Se você quiser ler os blobs da raiz do contêiner, não defina um padrão de caminho. No caminho, você pode optar por especificar uma ou mais instâncias das três variáveis a seguir: {date}, {time} ou {partition}

Exemplo 1: cluster1/logs/{date}/{time}/{partition}

Exemplo 2: cluster1/logs/{date}

O caractere * não é um valor permitido para o prefixo de caminho. Apenas caracteres de blobs do Azure válidos são permitidos. Não inclua nomes de contêiner ou de arquivo.
Formato de data (opcional) Se você usar a variável de data no caminho, o formato de data no qual os arquivos são organizados. Exemplo: YYYY/MM/DD

Quando a entrada do blob tem {date} ou {time} no caminho, as pastas são examinadas em ordem de tempo crescente.
Formato de hora (opcional) Se você usar a variável de data no caminho, o formato de data no qual os arquivos são organizados. Atualmente, o único valor com suporte é HH para horas.
Chave de partição É um campo opcional que está disponível somente se seu trabalho estiver configurado para usar o nível de compatibilidade 1.2 ou superior. Se a entrada for particionada por uma propriedade, você poderá adicionar o nome dessa propriedade. É usado para melhorar o desempenho da sua consulta se ela incluir uma cláusula PARTITION BY ou GROUP BY nessa propriedade. Se esse trabalho usar o nível de compatibilidade 1.2 ou superior, o padrão desse campo será "PartitionId."
Contagem de partições de entrada Este campo está presente somente quando {partition} está presente no padrão de caminho. O valor da propriedade do sistema é um inteiro >=1. Sempre que {partition} aparecer em pathPattern, um número entre 0 e o valor desse campo-1 será usado.
Formato de serialização do evento O formato de serialização (JSON, CSV, Avro, Parquet ou outros (Protobuf, XML, proprietário...)) do fluxo de dados de entrada. Verifique se o formato JSON está alinhado com a especificação e não inclui um 0 à esquerda para números decimais.
Codificação Para CSV e JSON, UTF-8 é o único formato de codificação com suporte no momento.
Compactação O tipo de compactação usado para ler o fluxo de dados de entrada, como nenhum (padrão), Gzip ou Deflate.

Quando seus dados forem provenientes de uma fonte de Armazenamento de Blobs, você poderá acessar alguns campos de metadados em sua consulta do Stream Analytics:

Propriedade Descrição
BlobName O nome do blob de entrada de onde o evento veio.
EventProcessedUtcTime A data e a hora em que o Stream Analytics processa o evento.
BlobLastModifiedUtcTime A data e hora da última modificação do blob.
PartitionId A ID de partição com base em zero para o adaptador de entrada.

Por exemplo, usando esses campos, você pode escrever uma consulta como o exemplo a seguir:

SELECT
    BlobName,
    EventProcessedUtcTime,
    BlobLastModifiedUtcTime
FROM Input

Transmissão de dados do Apache Kafka

O Azure Stream Analytics permite que você se conecte diretamente aos clusters do Apache Kafka para ingerir dados. A solução é de baixo código e totalmente gerenciada pela equipe do Azure Stream Analytics na Microsoft, permitindo que ela atenda aos padrões de conformidade corporativos. A entrada do Kafka é compatível com versões anteriores e dá suporte a todas as versões com a versão mais recente do cliente a partir da versão 0.10. Os usuários podem se conectar a clusters do Kafka dentro de uma rede virtual e a clusters do Kafka com um ponto de extremidade público, dependendo das configurações. A configuração depende das convenções de configuração existentes do Kafka. Os tipos de compactação com suporte são None, Gzip, Snappy, LZ4 e Zstd.

Para obter mais informações, confira Dados de fluxo do Kafka para o Azure Stream Analytics (versão prévia).

Próximas etapas