Transmitir dados como entrada no Stream Analytics
O Stream Analytics tem integração de primeira classe com fluxos de dados do Azure como entradas de quatro tipos de recursos:
- Hubs de Eventos do Azure
- Hub IoT do Azure
- Armazenamento de Blobs do Azure
- Azure Data Lake Storage Gen2 (Armazenamento do Azure Data Lake Gen2)
Esses recursos de entrada podem viver na mesma assinatura do Azure que seu trabalho do Stream Analytics ou em uma assinatura diferente.
Compressão
O Stream Analytics suporta compactação para todas as fontes de entrada. Os tipos de compressão suportados são: Nenhum, Gzip e Deflate. O suporte para compactação não está disponível para dados de referência. Se os dados de entrada forem dados Avro compactados, o Stream Analytics os tratará de forma transparente. Não é necessário especificar o tipo de compactação com a serialização Avro.
Criar, editar ou testar entradas
Você pode usar o portal do Azure, o Visual Studio e o Visual Studio Code para adicionar e exibir ou editar entradas existentes em seu trabalho de streaming. Você também pode testar conexões de entrada e consultas de teste de dados de exemplo do portal do Azure, Visual Studio e Visual Studio Code. Ao escrever uma consulta, você lista a entrada na cláusula FROM. Você pode obter a lista de entradas disponíveis na página Consulta no portal. Se você deseja usar várias entradas, JOIN
elas ou escrever várias SELECT
consultas.
Nota
É altamente recomendável que você use as ferramentas do Stream Analytics para Visual Studio Code para obter a melhor experiência de desenvolvimento local. Há lacunas de recursos conhecidas nas ferramentas do Stream Analytics para Visual Studio 2019 (versão 2.6.3000.0) e ele não será melhorado no futuro.
Transmitir dados a partir dos Hubs de Eventos
Os Hubs de Eventos do Azure são um ingestor de eventos de publicação-subscrição altamente escalável. Um hub de eventos pode coletar milhões de eventos por segundo para que você possa processar e analisar as enormes quantidades de dados produzidos por seus dispositivos e aplicativos conectados. Juntos, os Hubs de Eventos e o Stream Analytics podem fornecer uma solução completa para análises em tempo real. Os Hubs de Eventos permitem alimentar eventos no Azure em tempo real, e os trabalhos do Stream Analytics podem processar esses eventos em tempo real. Por exemplo, você pode enviar cliques na Web, leituras de sensores ou eventos de log online para Hubs de Eventos. Em seguida, você pode criar trabalhos do Stream Analytics para usar Hubs de Eventos para os dados de entrada para filtragem, agregação e correlação em tempo real.
EventEnqueuedUtcTime
é o carimbo de data/hora da chegada de um evento a um hub de eventos e é o carimbo de data/hora padrão de eventos provenientes de Hubs de Eventos para o Stream Analytics. Para processar os dados como um fluxo usando um carimbo de data/hora na carga útil do evento, você deve usar a palavra-chave TIMESTAMP BY .
Hubs de Eventos Grupos de consumidores
Você deve configurar cada entrada do hub de eventos para ter seu próprio grupo de consumidores. Quando um trabalho contém uma associação automática ou tem várias entradas, algumas entradas podem ser lidas por mais de um leitor a jusante. Esta situação tem impacto no número de leitores num único grupo de consumidores. Para evitar exceder o limite de Hubs de Eventos de cinco leitores por grupo de consumidores por partição, é uma prática recomendada designar um grupo de consumidores para cada trabalho do Stream Analytics. Há também um limite de 20 grupos de consumidores para um hub de eventos de camada Standard. Para obter mais informações, consulte Solucionar problemas de entradas do Azure Stream Analytics.
Criar uma entrada a partir de Hubs de Eventos
A tabela a seguir explica cada propriedade na página Nova entrada no portal do Azure para transmitir a entrada de dados de um hub de eventos:
Property | Description |
---|---|
Alias de entrada | Um nome amigável que você usa na consulta do trabalho para fazer referência a essa entrada. |
Subscrição | Escolha a assinatura do Azure na qual o recurso do hub de eventos existe. |
Espaço de nomes do Hub de Eventos | O namespace Hubs de Eventos é um contêiner para hubs de eventos. Ao criar um hub de eventos, você também cria o namespace. |
Nome do Hub de Eventos | O nome do hub de eventos a ser usado como entrada. |
Grupo de consumidores do Hub de Eventos (recomendado) | Recomendamos que você use um grupo de consumidores distinto para cada trabalho do Stream Analytics. Essa cadeia de caracteres identifica o grupo de consumidores a ser usado para ingerir dados do hub de eventos. Se nenhum grupo de consumidores for especificado, o trabalho do Stream Analytics usará o grupo de $Default consumidores. |
Modo de autenticação | Especifique o tipo de autenticação que você deseja usar para se conectar ao hub de eventos. Você pode usar uma cadeia de conexão ou uma identidade gerenciada para autenticar com o hub de eventos. Para a opção de identidade gerenciada, você pode criar uma identidade gerenciada atribuída pelo sistema para o trabalho do Stream Analytics ou uma identidade gerenciada atribuída pelo usuário para autenticar com o hub de eventos. Quando você usa uma identidade gerenciada, a identidade gerenciada deve ser membro das funções Recetor de Dados dos Hubs de Eventos do Azure ou Proprietário de Dados dos Hubs de Eventos do Azure. |
Nome da política do Hub de Eventos | A política de acesso compartilhado que fornece acesso aos Hubs de Eventos. Cada política de acesso compartilhado tem um nome, permissões definidas e chaves de acesso. Essa opção é preenchida automaticamente, a menos que você selecione a opção para fornecer as configurações dos Hubs de Eventos manualmente. |
Chave de partição | É um campo opcional que só está disponível se o seu trabalho estiver configurado para utilizar o nível de compatibilidade 1.2 ou superior. Se a sua entrada for particionada por uma propriedade, você pode adicionar o nome dessa propriedade aqui. Ele é usado para melhorar o desempenho da sua consulta se incluir uma PARTITION BY cláusula ou GROUP BY nesta propriedade. Se este trabalho utilizar o nível de compatibilidade 1.2 ou superior, este campo assume como predefinição PartitionId. |
Formato de serialização de eventos | O formato de serialização (JSON, CSV, Avro, Parquet ou Outro (Protobuf, XML, proprietário...)) do fluxo de dados de entrada. Verifique se o formato JSON está alinhado com a especificação e não inclui 0 à esquerda para números decimais. |
Encoding (Codificação) | UTF-8 é atualmente o único formato de codificação suportado. |
Tipo de compressão de eventos | O tipo de compactação usado para ler o fluxo de dados de entrada, como Nenhum (padrão), Gzip ou Esvaziar. |
Registro de esquema (visualização) | Você pode selecionar o registro de esquema com esquemas para dados de eventos recebidos do hub de eventos. |
Quando seus dados vêm de uma entrada de fluxo de Hubs de Eventos, você tem acesso aos seguintes campos de metadados em sua consulta do Stream Analytics:
Property | Description |
---|---|
EventProcessedUtcTime | A data e a hora em que o Stream Analytics processa o evento. |
EventEnqueuedUtcTime | A data e a hora em que os Hubs de Eventos recebem os eventos. |
PartitionId | O ID de partição baseado em zero para o adaptador de entrada. |
Por exemplo, usando esses campos, você pode escrever uma consulta como o exemplo a seguir:
SELECT
EventProcessedUtcTime,
EventEnqueuedUtcTime,
PartitionId
FROM Input
Nota
Ao usar Hubs de Eventos como um ponto de extremidade para Rotas do Hub IoT, você pode acessar os metadados do Hub IoT usando a função GetMetadataPropertyValue.
Transmitir dados do Hub IoT
O Hub IoT do Azure é um ingestor de eventos de publicação-assinatura altamente escalável otimizado para cenários de IoT.
O carimbo de data/hora padrão de eventos provenientes de um Hub IoT no Stream Analytics é o carimbo de data/hora em que o evento chegou ao Hub IoT, que é EventEnqueuedUtcTime
. Para processar os dados como um fluxo usando um carimbo de data/hora na carga útil do evento, você deve usar a palavra-chave TIMESTAMP BY .
Grupos de consumidores do Iot Hub
Você deve configurar cada entrada do Hub IoT do Stream Analytics para ter seu próprio grupo de consumidores. Quando um trabalho contém uma associação automática ou quando tem várias entradas, alguma entrada pode ser lida por mais de um leitor a jusante. Esta situação tem impacto no número de leitores num único grupo de consumidores. Para evitar exceder o limite do Hub IoT do Azure de cinco leitores por grupo de consumidores por partição, é uma prática recomendada designar um grupo de consumidores para cada trabalho do Stream Analytics.
Configurar um Hub IoT como uma entrada de fluxo de dados
A tabela a seguir explica cada propriedade na página Nova entrada no portal do Azure quando você configura um Hub IoT como uma entrada de fluxo.
Property | Description |
---|---|
Alias de entrada | Um nome amigável que você usa na consulta do trabalho para fazer referência a essa entrada. |
Subscrição | Escolha a assinatura na qual o recurso do Hub IoT existe. |
Hub IoT | O nome do Hub IoT a ser usado como entrada. |
Grupo de consumidores | Recomendamos que você use um grupo de consumidores diferente para cada trabalho do Stream Analytics. O grupo de consumidores é usado para ingerir dados do Hub IoT. O Stream Analytics usa o $Default grupo de consumidores, a menos que você especifique o contrário. |
Nome da política de acesso compartilhado | A política de acesso compartilhado que fornece acesso ao Hub IoT. Cada política de acesso compartilhado tem um nome, permissões definidas e chaves de acesso. |
Chave de política de acesso compartilhado | A chave de acesso compartilhada usada para autorizar o acesso ao Hub IoT. Essa opção é preenchida automaticamente, a menos que você selecione a opção para fornecer as configurações do Iot Hub manualmente. |
Ponto final | O ponto de extremidade para o Hub IoT. |
Chave de partição | É um campo opcional que só está disponível se o seu trabalho estiver configurado para utilizar o nível de compatibilidade 1.2 ou superior. Se a sua entrada for particionada por uma propriedade, você pode adicionar o nome dessa propriedade aqui. Ele é usado para melhorar o desempenho da sua consulta se incluir uma cláusula PARTITION BY ou GROUP BY nesta propriedade. Se este trabalho usar o nível de compatibilidade 1.2 ou superior, esse campo assumirá como padrão "PartitionId". |
Formato de serialização de eventos | O formato de serialização (JSON, CSV, Avro, Parquet ou Outro (Protobuf, XML, proprietário...)) do fluxo de dados de entrada. Verifique se o formato JSON está alinhado com a especificação e não inclui 0 à esquerda para números decimais. |
Encoding (Codificação) | UTF-8 é atualmente o único formato de codificação suportado. |
Tipo de compressão de eventos | O tipo de compactação usado para ler o fluxo de dados de entrada, como Nenhum (padrão), Gzip ou Esvaziar. |
Ao usar dados de fluxo de um Hub IoT, você tem acesso aos seguintes campos de metadados na consulta do Stream Analytics:
Property | Description |
---|---|
EventProcessedUtcTime | A data e a hora em que o evento foi processado. |
EventEnqueuedUtcTime | A data e a hora em que o Hub IoT recebe o evento. |
PartitionId | O ID de partição baseado em zero para o adaptador de entrada. |
IoTHub.MessageId | Um ID usado para correlacionar a comunicação bidirecional no Hub IoT. |
IoTHub.CorrelationId | Um ID que é usado em respostas de mensagens e comentários no Hub IoT. |
IoTHub.ConnectionDeviceId | O ID de autenticação usado para enviar esta mensagem. Esse valor é carimbado em mensagens vinculadas ao serviço pelo Hub IoT. |
IoTHub.ConnectionDeviceGenerationId | A ID de geração do dispositivo autenticado que foi usado para enviar esta mensagem. Esse valor é carimbado em mensagens vinculadas ao serviço pelo Hub IoT. |
IoTHub.EnqueuedTime | A hora em que o Hub IoT recebe a mensagem. |
Transmita dados do armazenamento de Blob ou do Data Lake Storage Gen2
Para cenários com grandes quantidades de dados não estruturados para armazenar na nuvem, o armazenamento de Blob do Azure ou o Azure Data Lake Storage Gen2 oferece uma solução econômica e escalável. Os dados no armazenamento de Blob ou no Azure Data Lake Storage Gen2 são considerados dados em repouso. No entanto, esses dados podem ser processados como um fluxo de dados pelo Stream Analytics.
O processamento de logs é um cenário comumente usado para usar essas entradas com o Stream Analytics. Nesse cenário, os arquivos de dados de telemetria são capturados de um sistema e precisam ser analisados e processados para extrair dados significativos.
O carimbo de data/hora padrão de um armazenamento de Blob ou evento do Azure Data Lake Storage Gen2 no Stream Analytics é o carimbo de data/hora que foi modificado pela última vez, que é BlobLastModifiedUtcTime
. Se um blob for carregado em uma conta de armazenamento às 13:00 e o trabalho do Azure Stream Analytics for iniciado usando a opção Agora às 13:01, ele não será coletado, pois seu tempo modificado está fora do período de execução do trabalho.
Se um blob for carregado em um contêiner de conta de armazenamento às 13:00 e o trabalho do Azure Stream Analytics for iniciado usando a Hora Personalizada às 13:00 ou antes, o blob será coletado à medida que seu tempo modificado cair dentro do período de execução do trabalho.
Se um trabalho do Azure Stream Analytics for iniciado usando Agora às 13:00 e um blob for carregado no contêiner da conta de armazenamento às 13:01, o Azure Stream Analytics pegará o blob. O carimbo de data/hora atribuído a cada blob é baseado apenas em BlobLastModifiedTime
. A pasta em que o blob está não tem relação com o carimbo de data/hora atribuído. Por exemplo, se houver um blob 2019/10-01/00/b1.txt
com um BlobLastModifiedTime
de , o carimbo de 2019-11-11
data/hora atribuído a esse blob será 2019-11-11
.
Para processar os dados como um fluxo usando um carimbo de data/hora na carga útil do evento, você deve usar a palavra-chave TIMESTAMP BY . Um trabalho do Stream Analytics extrai dados do armazenamento de Blob do Azure ou da entrada do Azure Data Lake Storage Gen2 a cada segundo se o arquivo de blob estiver disponível. Se o arquivo blob não estiver disponível, haverá um backoff exponencial com um atraso máximo de 90 segundos.
Nota
O Stream Analytics não suporta a adição de conteúdo a um arquivo de blob existente. O Stream Analytics visualizará cada arquivo apenas uma vez, e quaisquer alterações que ocorram no arquivo após o trabalho ter lido os dados não serão processadas. A prática recomendada é carregar todos os dados de um arquivo de blob de uma só vez e, em seguida, adicionar eventos mais recentes a um novo arquivo de blob diferente.
Em cenários em que muitos blobs são adicionados continuamente e o Stream Analytics está processando os blobs à medida que são adicionados, é possível que alguns blobs sejam ignorados em casos raros devido à granularidade do BlobLastModifiedTime
. Você pode atenuar esse caso carregando blobs com pelo menos dois segundos de intervalo. Se essa opção não for viável, você poderá usar Hubs de Eventos para transmitir grandes volumes de eventos.
Configurar o armazenamento de Blob como uma entrada de fluxo
A tabela a seguir explica cada propriedade na página Nova entrada no portal do Azure quando você configura o armazenamento de Blob como uma entrada de fluxo.
Property | Description |
---|---|
Alias de entrada | Um nome amigável que você usa na consulta do trabalho para fazer referência a essa entrada. |
Subscrição | Escolha a assinatura na qual o recurso de armazenamento existe. |
Conta de armazenamento | O nome da conta de armazenamento onde os arquivos de blob estão localizados. |
Chave da conta de armazenamento | A chave secreta associada à conta de armazenamento. Esta opção é preenchida automaticamente, a menos que você selecione a opção para fornecer as configurações manualmente. |
Container (Contentor) | Os contêineres fornecem um agrupamento lógico para blobs. Você pode escolher Usar contêiner existente ou Criar novo para ter um novo contêiner criado. |
Modo de autenticação | Especifique o tipo de autenticação que você deseja usar para se conectar à conta de armazenamento. Você pode usar uma cadeia de conexão ou uma identidade gerenciada para autenticar com a conta de armazenamento. Para a opção de identidade gerenciada, você pode criar uma identidade gerenciada atribuída ao sistema para o trabalho do Stream Analytics ou uma identidade gerenciada atribuída pelo usuário para autenticar com a conta de armazenamento. Quando você usa uma identidade gerenciada, a identidade gerenciada deve ser membro de uma função apropriada na conta de armazenamento. |
Padrão de caminho (opcional) | O caminho do arquivo usado para localizar os blobs dentro do contêiner especificado. Se você quiser ler blobs da raiz do contêiner, não defina um padrão de caminho. Dentro do caminho, você pode especificar uma ou mais instâncias das três variáveis a seguir: {date} , {time} , ou {partition} Exemplo 1: cluster1/logs/{date}/{time}/{partition} Exemplo 2: cluster1/logs/{date} O * caractere não é um valor permitido para o prefixo do caminho. Somente caracteres de blob válidos do Azure são permitidos. Não inclua nomes de contêineres ou nomes de arquivos. |
Formato de data (opcional) | Se você usar a variável de data no caminho, o formato de data no qual os arquivos são organizados. Exemplo: YYYY/MM/DD Quando a entrada de blob tem {date} ou {time} em seu caminho, as pastas são examinadas em ordem de tempo crescente. |
Formato de hora (opcional) | Se você usar a variável de tempo no caminho, o formato de hora no qual os arquivos são organizados. Atualmente, o único valor suportado é HH por horas. |
Chave de partição | É um campo opcional que só está disponível se o seu trabalho estiver configurado para utilizar o nível de compatibilidade 1.2 ou superior. Se a sua entrada for particionada por uma propriedade, você pode adicionar o nome dessa propriedade aqui. Ele é usado para melhorar o desempenho da sua consulta se incluir uma cláusula PARTITION BY ou GROUP BY nesta propriedade. Se este trabalho usar o nível de compatibilidade 1.2 ou superior, esse campo assumirá como padrão "PartitionId". |
Contagem de partições de entrada | Este campo está presente somente quando {partition} está presente no padrão de caminho. O valor dessa propriedade é um inteiro >=1. Sempre que {partition} aparecer em pathPattern, um número entre 0 e o valor deste campo -1 será usado. |
Formato de serialização de eventos | O formato de serialização (JSON, CSV, Avro, Parquet ou Outro (Protobuf, XML, proprietário...)) do fluxo de dados de entrada. Verifique se o formato JSON está alinhado com a especificação e não inclui 0 à esquerda para números decimais. |
Encoding (Codificação) | Para CSV e JSON, UTF-8 é atualmente o único formato de codificação suportado. |
Compressão | O tipo de compactação usado para ler o fluxo de dados de entrada, como Nenhum (padrão), Gzip ou Esvaziar. |
Quando seus dados vêm de uma fonte de armazenamento de Blob, você tem acesso aos seguintes campos de metadados em sua consulta do Stream Analytics:
Property | Description |
---|---|
BlobName | O nome do blob de entrada do qual o evento veio. |
EventProcessedUtcTime | A data e a hora em que o Stream Analytics processa o evento. |
BlobLastModifiedUtcTime | A data e a hora em que o blob foi modificado pela última vez. |
PartitionId | O ID de partição baseado em zero para o adaptador de entrada. |
Por exemplo, usando esses campos, você pode escrever uma consulta como o exemplo a seguir:
SELECT
BlobName,
EventProcessedUtcTime,
BlobLastModifiedUtcTime
FROM Input
Transmitir dados do Apache Kafka
O Azure Stream Analytics permite que você se conecte diretamente a clusters Apache Kafka para ingerir dados. A solução é low code e totalmente gerenciada pela equipe do Azure Stream Analytics na Microsoft, permitindo que ela atenda aos padrões de conformidade de negócios. A entrada Kafka é compatível com versões anteriores e suporta todas as versões com a versão mais recente do cliente a partir da versão 0.10. Os usuários podem se conectar a clusters Kafka dentro de uma rede virtual e clusters Kafka com um ponto de extremidade público, dependendo das configurações. A configuração baseia-se nas convenções de configuração Kafka existentes. Os tipos de compressão suportados são None, Gzip, Snappy, LZ4 e Zstd.
Para obter mais informações, consulte Transmitir dados do Kafka para o Azure Stream Analytics (Visualização).