Tutorial: Executar o Azure Functions a partir dos trabalhos do Azure Stream Analytics
Neste tutorial, você cria um trabalho do Azure Stream Analytics que lê eventos dos Hubs de Eventos do Azure, executa uma consulta nos dados do evento e invoca uma função do Azure, que grava em uma instância do Cache do Azure para Redis.
Observação
- É possível executar o Azure Functions a partir do Azure Stream Analytics configurando o Azure Functions como um dos coletores (saídas) para o trabalho do Stream Analytics. O Azure Functions é uma experiência de computação sob demanda controlada por evento que permite implementar o código que é disparado por eventos que ocorrem no Azure ou em serviços de terceiros. Essa capacidade do Azure Functions de responder a gatilhos faz dele uma saída natural para trabalhos do Stream Analytics.
- O Stream Analytics invoca o Azure Functions por meio de gatilhos HTTP. O adaptador de saída do Azure Functions permite que os usuários o conectem ao Stream Analytics, de modo que os eventos possam ser disparados com base em consultas do Stream Analytics.
- Não há suporte para a conexão com o Azure Functions em uma VNet (rede virtual) por meio de um trabalho do Stream Analytics que esteja sendo executado em um cluster multilocatário.
Neste tutorial, você aprenderá a:
- Criar uma instância dos Hubs de Eventos do Azure
- Criar uma instância de Cache do Azure para Redis
- Criar uma Função do Azure
- Criar um trabalho de Stream Analytics
- Configurar o hub de eventos como entrada e função como saída
- Executar o trabalho do Stream Analytics
- Verificar o Cache do Azure para Redis quanto aos resultados
Se você não tiver uma assinatura do Azure, crie uma conta gratuita antes de começar.
Pré-requisitos
Antes de começar, verifique se você concluiu as seguintes etapas:
- Se você não tiver uma assinatura do Azure, crie uma conta gratuita.
- Faça o download do aplicativo gerador de evento de chamada telefônicaTelcoGenerator.zip do Centro de Download da Microsoft ou obtenha o código-fonte no GitHub.
Entrar no Azure
Entre no portal do Azure.
Criar um Hub de Evento
Você deve enviar os dados de exemplo para um hub de eventos antes que o Stream Analytics possa analisar o fluxo de dados de chamadas fraudulentas. Neste tutorial, você envia dados ao Azure usando o Hubs de Eventos do Azure.
Use as seguintes etapas para criar um hub de eventos e enviar dados de chamada para ele:
Entre no portal do Azure.
Selecione Todos os serviços no menu à esquerda, selecione Internet das coisas, passe o mouse sobre Hubs de Eventos e, em seguida, selecione o botão + (Adicionar).
Na página Criar Namespace, siga estas etapas:
Selecione uma assinatura do Azure em que deseja criar o hub de eventos.
Em Grupo de recursos, escolha Criar e insira um nome para o grupo de recursos. O namespace dos Hubs de Eventos é criado neste grupo de recursos.
Em Nome do namespace, insira um nome exclusivo para o namespace dos Hubs de Eventos.
Em Localização, selecione a região na qual deseja criar o namespace.
Em Tipo de preço, selecione Standard.
Selecione Revisar + criar na parte inferior da página.
Na página Revisar + criar do assistente de criação de namespace, selecione Criar na parte inferior da página depois de revisar todas as configurações.
Depois que o namespace for implantado com êxito, selecione Ir para o recurso para navegar até a página Namespace dos Hubs de Eventos.
Na página Namespace de Hubs de Eventos, selecione +Hub de Eventos na barra de comando.
Na página Criar Hub de Eventos, insira um nome para o Hub de Eventos. Defina a Contagem de Partições como 2. Use as opções padrão nas configurações restantes e escolha Revisar + criar.
Na página Examinar + criar, selecione Criar na parte inferior da página. Aguarde até que a implantação tenha êxito.
Conceder acesso para o hub de eventos e obter uma cadeia de caracteres de conexão
Antes que um aplicativo possa enviar dados aos Hubs de Eventos do Azure, o hub de eventos deve ter uma política que permita o acesso. A política de acesso produz uma cadeia de conexão que inclui informações de autorização.
Na página Namespace dos Hubs de Eventos, selecione Políticas de acesso compartilhado no menu à esquerda.
Selecione RootManageSharedAccessKey na lista de políticas.
Então, selecione o botão Copiar ao lado de Cadeia de conexão - chave primária.
Cole a cadeia de conexão em um editor de texto. Você precisará dessa cadeia de conexão na próxima seção.
A cadeia de conexão tem esta aparência:
Endpoint=sb://<Your event hub namespace>.servicebus.windows.net/;SharedAccessKeyName=<Your shared access policy name>;SharedAccessKey=<generated key>
Observe que a cadeia de conexão contém vários pares de chave-valor separados por ponto e vírgula: Endpoint, SharedAccessKeyName e SharedAccessKey.
Iniciar o aplicativo gerador de evento
Antes de iniciar o aplicativo TelcoGenerator, configure-o para enviar dados para os Hubs de Eventos do Azure criados anteriormente.
Extraia o conteúdo do arquivo TelcoGenerator.zip.
Abra o arquivo
TelcoGenerator\TelcoGenerator\telcodatagen.exe.config
em um editor de texto de sua escolha Há mais de um arquivo.config
, portanto verifique se abriu o arquivo correto.Atualize o elemento
<appSettings>
no arquivo de configuração com os seguintes detalhes:- Defina o valor da chave EventHubName como o valor de EntityPath no final da cadeia de conexão.
- Defina o valor da chave Microsoft.ServiceBus.ConnectionString para a cadeia de conexão para o namespace. Se você usar uma cadeia de conexão para um hub de eventos, não um namespace, remova o valor
EntityPath
(;EntityPath=myeventhub
) no final. Não se esqueça de remover o ponto e vírgula que precede o valor de EntityPath.
Salve o arquivo.
Em seguida, abra uma janela de comando e altere para a pasta onde o aplicativo TelcoGenerator foi descompactado. Em seguida, digite o seguinte comando:
.\telcodatagen.exe 1000 0.2 2
Esse comando usa os seguintes parâmetros:
- Número de registros de dados de chamadas por hora.
- Porcentagem de probabilidade de fraude, que é a frequência com que o aplicativo deve simular uma chamada fraudulenta. O valor 0,2 significa que cerca de 20% dos registros de chamada parecem ser fraudulentos.
- Duração em horas, que é o número de horas em que o aplicativo deve ser executado. Também é possível interromper o aplicativo a qualquer momento encerrando o processo (Ctrl+C) na linha de comando.
Depois de alguns segundos, o aplicativo é iniciado exibindo registros de chamada telefônica na tela, enquanto envia para o hub de eventos. Os dados de chamadas telefônicas contêm os seguintes campos:
Registro Definição CallrecTime Carimbo de data/hora para a hora de início da chamada. SwitchNum Chave do telefone usada para se conectar à chamada. Neste exemplo, as opções são cadeias de caracteres que representam o país/região de origem (Estados Unidos, China, Reino Unido, Alemanha ou Austrália). CallingNum Número de telefone do autor da chamada. CallingIMSI A Identidade do Assinante Móvel Internacional (IMSI). É um identificador exclusivo do autor da chamada. CalledNum O número de telefone do destinatário da chamada. CalledIMSI Identidade do Assinante Móvel Internacional (IMSI). É um identificador exclusivo do destinatário da chamada.
Criar um trabalho de Stream Analytics
Agora que você tem um fluxo de eventos de chamada, pode criar um trabalho do Stream Analytics que lê dados do hub de eventos.
- Para criar um trabalho do Stream Analytics, navegue até o portal do Azure.
- Clique em Criar um recurso e pesquise um trabalho do Stream Analytics. Selecione o bloco Trabalho do Stream Analytics e selecione Criar.
- Na página Novo trabalho do Stream Analytics, siga estas etapas:
Em Assinatura, selecione a assinatura que contém o namespace dos Hubs de Eventos.
Em Grupo de recursos, escolha o grupo de recursos criados anteriormente.
Na seção Detalhes da instância, em Nome, insira um nome exclusivo para o trabalho do Stream Analytics.
Em Região, selecione a região na qual deseja criar o trabalho do Stream Analytics. Recomendamos colocar o trabalho e o hub de eventos na mesma região para melhor desempenho e para que você não precise pagar para transferir dados entre regiões.
Em Ambiente de hospedagem<, selecione Nuvem, caso ainda não esteja selecionado. Os trabalhos do Stream Analytics podem ser implantados na nuvem ou na borda. O Cloud permite que você implante no Azure Cloud, e o Edge permite que você implante em um dispositivo IoT Edge.
Em Unidades de streaming, selecione 1. As unidades de streaming representam os recursos de computação necessários para executar um trabalho. Por padrão, esse valor é definido como 1. Para saber mais sobre como dimensionar unidades de streaming, confira o artigo Entendendo e ajustando as unidades de streaming.
Selecione Revisar + criar na parte inferior da página.
- Na página Revisar + criar, revise as configurações e selecione Criar para criar um trabalho do Stream Analytics.
- Depois que o trabalho for implantado, selecione Acessar recurso para navegar até a página do trabalho do Stream Analytics.
Configurar entrada de trabalho
A próxima etapa é definir uma fonte de entrada para o trabalho ler os dados usando o hub de eventos criado na seção anterior.
Na página do trabalho do Stream Analytics, na seção Topologia do Trabalho no menu à esquerda, selecione Entradas.
Na página Entradas, selecione + Adicionar entrada e Hub de eventos.
Na página Hub de eventos, siga estas etapas:
Em Alias de entrada, insira CallStream. O alias de entrada é um nome amigável para identificar a entrada. O alias de entrada pode conter somente caracteres alfanuméricos, hifens e sublinhados e deve ter entre 3 e 63 caracteres.
Em Assinatura, selecione a assinatura do Azure em que você criou o hub de eventos. O hub de eventos pode estar na mesma assinatura ou em uma diferente da do trabalho do Stream Analytics.
Em Namespace dos Hubs de Eventos, selecione o namespace dos Hubs de Eventos criado na seção anterior. Todos os namespaces disponíveis na sua assinatura atual são listados no menu suspenso.
Em Nome do hub de eventos, selecione o hub de eventos criado na seção anterior. Todos os hubs de eventos disponíveis no namespace selecionado são listados no menu suspenso.
Em Grupo de consumidores do hub de eventos, mantenha a opção Criar selecionada para que um grupo de consumidores seja criado no hub de eventos. É recomendável usar um grupo de consumidores distinto para cada trabalho do Stream Analytics. Se nenhum grupo de consumidores for especificado, o trabalho do Stream Analytics usará o grupo de consumidores
$Default
. Quando um trabalho contém uma autojunção ou várias entradas, no futuro, algumas entradas podem ser lidas por mais de um leitor. Essa situação afeta o número de leitores em um único grupo de consumidores.Em Modo de autenticação, selecione Cadeia de conexão. É mais fácil testar o tutorial com essa opção.
Em Nome da política do hub de eventos, selecione Usar existente e escolha a política criada anteriormente.
Escolha Salvar na parte inferior da página.
Criar uma instância de Cache do Azure para Redis
Crie um cache no Cache do Azure para Redis usando as etapas descritas em Criar uma instância do Cache do Azure para Redis.
Depois de criar o cache, em Configurações, selecione Chaves de Acesso. Anote a Chave da cadeia de conexão primária.
Criar uma função no Azure Functions que pode gravar dados no Cache do Azure para Redis
Consulte a seção Criar um aplicativo de funções da documentação do Azure Functions. Este exemplo foi criado em:
Crie um aplicativo de funções HttpTrigger padrão no Visual Studio Code seguindo este tutorial. As seguintes informações são usadas: idioma:
C#
, runtime:.NET 6
(na função v4), modelo:HTTP trigger
.Instale a biblioteca de clientes do Redis executando o seguinte comando em um terminal localizado na pasta do projeto:
dotnet add package StackExchange.Redis --version 2.2.88
Adicione os itens
RedisConnectionString
eRedisDatabaseIndex
na seçãoValues
dolocal.settings.json
, preenchendo a cadeia de conexão do servidor de destino:{ "IsEncrypted": false, "Values": { "AzureWebJobsStorage": "", "FUNCTIONS_WORKER_RUNTIME": "dotnet", "RedisConnectionString": "Your Redis Connection String", "RedisDatabaseIndex":"0" } }
O Índice do Banco de Dados Redis é o número de 0 a 15 que identifica o banco de dados na instância.
Substitua toda a função (arquivo. cs no projeto) pelo trecho de código a seguir. Atualize o namespace, o nome da classe e o nome da função por conta própria:
using System; using System.IO; using System.Threading.Tasks; using Microsoft.AspNetCore.Mvc; using Microsoft.Azure.WebJobs; using Microsoft.Azure.WebJobs.Extensions.Http; using Microsoft.AspNetCore.Http; using Microsoft.Extensions.Logging; using Newtonsoft.Json; using StackExchange.Redis; namespace Company.Function { public static class HttpTrigger1{ [FunctionName("HttpTrigger1")] public static async Task<IActionResult> Run( [HttpTrigger(AuthorizationLevel.Function, "get","post", Route = null)] HttpRequest req, ILogger log) { // Extract the body from the request string requestBody = await new StreamReader(req.Body).ReadToEndAsync(); if (string.IsNullOrEmpty(requestBody)) {return new StatusCodeResult(204);} // 204, ASA connectivity check dynamic data = JsonConvert.DeserializeObject(requestBody); // Reject if too large, as per the doc if (data.ToString().Length > 262144) {return new StatusCodeResult(413);} //HttpStatusCode.RequestEntityTooLarge string RedisConnectionString = Environment.GetEnvironmentVariable("RedisConnectionString"); int RedisDatabaseIndex = int.Parse(Environment.GetEnvironmentVariable("RedisDatabaseIndex")); using (var connection = ConnectionMultiplexer.Connect(RedisConnectionString)) { // Connection refers to a property that returns a ConnectionMultiplexer IDatabase db = connection.GetDatabase(RedisDatabaseIndex); // Parse items and send to binding for (var i = 0; i < data.Count; i++) { string key = data[i].Time + " - " + data[i].CallingNum1; db.StringSet(key, data[i].ToString()); log.LogInformation($"Object put in database. Key is {key} and value is {data[i].ToString()}"); // Simple get of data types from the cache string value = db.StringGet(key); log.LogInformation($"Database got: {key} => {value}"); } } return new OkResult(); // 200 } } }
Quando o Stream Analytics recebe a exceção "Entidade de Solicitação HTTP muito grande" da função, ele reduz o tamanho dos lotes que envia para as funções. O código a seguir garante que o Stream Analytics não envia lotes muito grandes. Certifique-se de que os valores de contagem e tamanho máximo do lote usados na função sejam consistentes com os valores inseridos no portal do Stream Analytics.
A função agora pode ser publicada no Azure.
Abra a função no portal do Azure e defina as configurações do aplicativo para
RedisConnectionString
eRedisDatabaseIndex
.
Atualizar o trabalho do Stream Analytics com a função como saída
Abra seu trabalho do Stream Analytics no portal do Azure.
Navegue até a função e selecione Visão Geral>Saídas>Adicionar. Para adicionar uma nova saída, selecione Azure Functions como a opção de coletor. O adaptador de saída do Functions tem as seguintes propriedades:
Nome da propriedade Descrição Alias de saída Um nome amigável que você usa na consulta do trabalho para fazer referência à saída. Importar opção Você poderá usar a função da assinatura atual ou fornecer as configurações manualmente se a função estiver localizada em outra assinatura. Aplicativo de Funções Nome de seu aplicativo de funções. Função Nome da função em seu aplicativo de funções (nome da função run.csx). Tamanho Máximo do Lote Define o tamanho máximo de cada lote de saída, que é enviado para a função em bytes. Por padrão, esse valor é definido como 262.144 bytes (256 KB). Contagem Máxima do Lote Especifica o número máximo de eventos em cada lote enviado para a função. O valor padrão é 100. Essa propriedade é opcional. Chave Permite que você use uma função de outra assinatura. Forneça o valor da chave para acessar sua função. Essa propriedade é opcional. Forneça um nome para o alias de saída. Neste tutorial, ele é nomeado saop1, mas você pode usar qualquer nome de sua escolha. Preencha outros detalhes.
Abra seu trabalho do Stream Analytics e atualize a consulta para o seguinte.
Importante
O script de exemplo a seguir pressupõe que você tenha usado CallStream para o nome de entrada e saop1 para o nome de saída. Se você usou nomes diferentes, não se esqueça de atualizar a consulta.
SELECT System.Timestamp as Time, CS1.CallingIMSI, CS1.CallingNum as CallingNum1, CS2.CallingNum as CallingNum2, CS1.SwitchNum as Switch1, CS2.SwitchNum as Switch2 INTO saop1 FROM CallStream CS1 TIMESTAMP BY CallRecTime JOIN CallStream CS2 TIMESTAMP BY CallRecTime ON CS1.CallingIMSI = CS2.CallingIMSI AND DATEDIFF(ss, CS1, CS2) BETWEEN 1 AND 5 WHERE CS1.SwitchNum != CS2.SwitchNum
Inicie o aplicativo telcodatagen.exe executando o seguinte comando na linha de comando. O comando usa o formato
telcodatagen.exe [#NumCDRsPerHour] [SIM Card Fraud Probability] [#DurationHours]
.telcodatagen.exe 1000 0.2 2
Inicie o trabalho do Stream Analytics.
Na página Monitorar para sua função do Azure, você visualizará que a função é invocada.
Na página Cache do Azure para Redis, selecione Métricas no menu à esquerda, adicione a métrica Gravação de Cache e defina a duração como última hora. Será visualizado o gráfico semelhante à imagem a seguir.
Verificar o Cache do Azure para Redis quanto aos resultados
Obter a chave de logs do Azure Functions
Primeiro, obtenha a chave de um registro inserido em Cache do Azure para Redis. No código, a chave é calculada na função do Azure, conforme mostrado no seguinte snippet de código:
string key = data[i].Time + " - " + data[i].CallingNum1;
db.StringSet(key, data[i].ToString());
log.LogInformation($"Object put in database. Key is {key} and value is {data[i].ToString()}");
Navegue até o portal do Azure e localize o aplicativo Azure Functions.
Selecione Funções no menu esquerdo.
Selecione HTTPTrigger1 na lista de funções.
Selecione Monitor no menu à esquerda.
Alterne para a guia Logs.
Anote uma chave da mensagem informativa, conforme mostrado na captura de tela a seguir. Use essa chave para localizar o valor em Cache do Azure para Redis.
Use a chave para localizar o registro em Cache do Azure para Redis
Navegue até o portal do Azure e localize seu Cache do Azure para Redis. Selecione Console.
Use Comandos do Cache do Azure para Redis para verificar se seus dados estão no Cache do Azure para Redis. (O comando usa o formato Get {key}.) Use a chave copiada dos logs do Monitor para a função do Azure (na seção anterior).
Get "KEY-FROM-THE-PREVIOUS-SECTION"
Este comando deve imprimir o valor para a chave especificada:
Tratamento de erros e novas tentativas
Se ocorrer uma falha durante o envio de eventos para o Azure Functions, o Stream Analytics repetirá a maioria das operações. Todas as exceções http serão repetidas até serem bem-sucedidas, exceto para o erro http 413 (entidade grande demais). Um erro de entidade grande demais é tratado como um erro de dados que está sujeito à política de repetição ou remoção.
Observação
O tempo limite para solicitações HTTP do Stream Analytics para Azure Functions é definido como 100 segundos. Se o aplicativo do Azure Functions levar mais de 100 segundos para processar um lote, ocorrerá um erro no Stream Analytics e ele tentará executar novamente o lote.
A repetição de tempos limite pode resultar em eventos duplicados gravados no coletor de saída. Quando o Stream Analytics tenta executar novamente um lote com falha, ele tenta todos os eventos no lote. Por exemplo, considere um lote de 20 eventos que são enviados para o Azure Functions do Stream Analytics. Suponha que o Azure Functions leve 100 segundos para processar os 10 primeiros eventos nesse lote. Após os 100 segundos, o Stream Analytics suspende a solicitação, uma vez que não recebeu uma resposta positiva do Azure Functions, e outra solicitação é enviada para o mesmo lote. Os 10 primeiros eventos no lote são processados novamente pelo Azure Functions, o que causa uma duplicata.
Problemas conhecidos
No portal do Azure, quando você tenta redefinir o valor de Tamanho máximo do lote/Contagem máxima de lotes como vazio (padrão), o valor é alterado para o valor inserido anteriormente ao salvar. Nesse caso, insira manualmente os valores padrão para esses campos.
Atualmente, o uso de roteiros HTTP em seu Azure Functions não é compatível com o Stream Analytics.
O suporte para se conectar ao Azure Functions hospedado em uma rede virtual não está habilitado.
Limpar os recursos
Quando não forem mais necessários, exclua o grupo de recursos, o trabalho de streaming e todos os recursos relacionados. A exclusão do trabalho evita a cobrança das unidades de streaming consumidas por ele. Se você está planejando usar o trabalho no futuro, pode interrompê-lo e reiniciar mais tarde, quando necessário. Se você não pretende continuar usando este trabalho, exclua todos os recursos criados por este início rápido seguindo estas etapas:
- No menu à esquerda no Portal do Azure, selecione Grupos de recursos e selecione o nome do recurso criado.
- Em sua página de grupo de recursos, selecione Excluir, digite o nome do recurso para excluir na caixa de texto e selecione Excluir.
Próximas etapas
Neste tutorial, você criou um trabalho simples do Stream Analytics que executa uma função do Azure. Para saber mais sobre trabalhos do Stream Analytics, prossiga para o seguinte tutorial: