Tutorial: Ejecución de Azure Functions desde trabajos de Azure Stream Analytics
En este tutorial, creará un trabajo de Azure Stream Analytics que lee eventos de Azure Event Hubs, ejecuta una consulta en los datos del evento y a continuación, invoca una función de Azure, que escribe en una instancia de Azure Cache for Redis.
Nota:
- Puede ejecutar Azure Functions desde Azure Stream Analytics mediante la configuración de Functions como uno de los receptores (salidas) para el trabajo de Stream Analytics. Functions es una experiencia de procesos a petición orientada a eventos que permite implementar el código desencadenado por los eventos que se producen en servicios de Azure o en servicios de terceros. Esta capacidad que tiene Functions para responder a los desencadenadores hace que sea una salida natural para los trabajos de Stream Analytics.
- Stream Analytics llama a Functions mediante desencadenadores HTTP. El adaptador de salida de Functions permite a los usuarios conectar instancias de Functions a Stream Analytics, de manera que los eventos se puedan desencadenar basándose en consultas de Stream Analytics.
- No de admite la conexión a Azure Functions dentro de una red virtual (VNet) desde un trabajo de Stream Analytics que se ejecute en un clúster de varios inquilinos.
En este tutorial, aprenderá a:
- Creación de una instancia de Azure Event Hubs
- Creación de una instancia de Azure Redis Cache
- Creación de una Función de Azure
- Creación de un trabajo de Stream Analytics
- Configuración del centro de eventos como entrada y función como salida
- Ejecución del trabajo de Stream Analytics
- Búsqueda de resultados en Azure Redis Cache
Si no tiene una suscripción a Azure, cree una cuenta gratuita antes de empezar.
Requisitos previos
Antes de empezar, asegúrese de que ha completado los pasos siguientes:
- Si no tiene una suscripción a Azure, cree una cuenta gratuita.
- Descargue la aplicación de generación de eventos de llamada telefónica, TelcoGenerator.zip del Centro de descarga de Microsoft u obtenga el código fuente de GitHub.
Inicio de sesión en Azure
Inicie sesión en Azure Portal.
Creación de un centro de eventos
Debe enviar algunos datos de ejemplo a un centro de eventos para que Stream Analytics pueda analizar el flujo de datos de llamadas fraudulentas. En este tutorial, enviará los datos a Azure mediante Azure Event Hubs.
Siga estos pasos para crear un centro de eventos y enviarle datos de llamada:
Inicie sesión en Azure Portal.
Seleccione Todos los servicios en el menú de la izquierda. Después, seleccione Internet de las cosas. Luego, coloque el mouse sobre Event Hubs y, a continuación, seleccione el botón + (Agregar).
En la página Crear espacio de nombres, siga estos pasos:
Seleccione una suscripción de Azure donde quiere crear el centro de eventos.
En Grupo de recursos, seleccione Crear nuevo y escriba un nombre para el grupo de recursos. En este grupo de recursos, se crea el espacio de nombres de Event Hubs.
En Nombre de espacio de nombres, escriba un nombre único para el espacio de nombres de Event Hubs.
En Ubicación, seleccione la región en la que desea crear el espacio de nombres.
En Plan de tarifa, seleccione Estándar.
En la parte inferior de la página, seleccione Revisar y crear.
En la página Revisar y crear del Asistente para la creación de espacios de nombres, seleccione Crear en la parte inferior de la página después de revisar todos los valores.
Una vez implementado correctamente el espacio de nombres, seleccione Ir al recurso para ir a la página Espacio de nombres de Event Hubs.
En la página Espacio de nombres de Event Hubs, seleccione +Event Hubs en la barra de comandos.
En la página Crear centro de eventos, escriba unNombre para el centro de eventos. En Recuento de particiones, seleccione el valor 2. Use las opciones predeterminadas en el resto de los valores y seleccione Revisar y crear.
En la página Revisar y crear, seleccione Crear en la parte inferior de la página. Espere a que la implementación se realice correctamente.
Concesión de acceso al centro de eventos y obtención de una cadena de conexión
Para que una aplicación pueda enviar datos a Azure Event Hubs, el centro de eventos debe tener una directiva que permita el acceso. La directiva de acceso genera una cadena de conexión que incluye la información de autorización.
En la página Espacio de nombres de Event Hubs, seleccione Directivas de acceso compartido en el menú de la izquierda.
Seleccione RootManageSharedAccessKey de la lista de directivas.
Después, seleccione el botón Copiar a lado de Cadena de conexión: clave principal.
Pegue la cadena de conexión en un editor de texto. Necesitará esta cadena de conexión en la sección siguiente.
La cadena de conexión tiene el siguiente aspecto:
Endpoint=sb://<Your event hub namespace>.servicebus.windows.net/;SharedAccessKeyName=<Your shared access policy name>;SharedAccessKey=<generated key>
Tenga en cuenta que la cadena de conexión contiene varios pares clave-valor separados por punto y coma: Endpoint, SharedAccessKeyName y SharedAccessKey.
Inicio de la aplicación de generación de eventos
Antes de iniciar la aplicación TelcoGenerator, debe configurarla para enviar datos a los centros de eventos de Azure que creó anteriormente.
Extraiga el contenido del archivo TelcoGenerator.zip.
Abra el archivo
TelcoGenerator\TelcoGenerator\telcodatagen.exe.config
en el editor de texto de su elección. Hay más de un archivo.config
, por lo que debe asegurarse de abrir el correcto.Actualice el elemento
<appSettings>
en el archivo de configuración con los detalles siguientes:- Establezca el valor de la clave EventHubName en el valor de EntityPath al final de la cadena de conexión.
- Establezca el valor de la clave Microsoft.ServiceBus.ConnectionString en la cadena de conexión en el espacio de nombres. Si usa una cadena de conexión a un centro de eventos, no un espacio de nombres, quite el valor
EntityPath
(;EntityPath=myeventhub
) al final. No olvide eliminar el punto y coma que precede al valor EntityPath.
Guarde el archivo.
A continuación abra una ventana de comandos y cambie a la carpeta en la que ha descomprimido la aplicación TelcoGenerator. Después, escriba el comando siguiente:
.\telcodatagen.exe 1000 0.2 2
Este comando toma los parámetros siguientes:
- Número de registros de datos de llamada por hora.
- Porcentaje de probabilidad de fraude, que es la frecuencia con la que la aplicación debe simular una llamada fraudulenta. El valor 0,2 indica que aproximadamente el 20 % de los registros de llamada tienen un aspecto fraudulento.
- Duración en horas, que es el número de horas que debe ejecutarse la aplicación. La aplicación se puede detener en cualquier momento. Para ello, solo hay que finalizar el proceso (Ctrl+C) en la línea de comandos.
Después de unos segundos, la aplicación comienza a mostrar los registros de llamada telefónica en la pantalla tal y como los envía al centro de eventos. Los datos de llamada de teléfono contienen los siguientes campos:
Registro Definición CallrecTime Marca de tiempo para la hora de inicio de la llamada. SwitchNum Conmutador de teléfono que se usa para conectar la llamada. En este ejemplo, los conmutadores son cadenas que representan el país o la región de origen (Estados Unidos, China, Reino Unido, Alemania o Australia). CallingNum Número de teléfono del autor de la llamada. CallingIMSI Identidad del suscriptor móvil internacional (IMSI). Identificador único del autor de la llamada. CalledNum Número de teléfono del destinatario de la llamada. CalledIMSI Identidad del suscriptor móvil internacional (IMSI). Identificador único del destinatario de la llamada.
Creación de un trabajo de Stream Analytics
Ahora que tiene un flujo de eventos de llamada, puede crear un trabajo de Stream Analytics que lea los datos desde el centro de eventos.
- Para crear un trabajo de Stream Analytics, vaya a Azure Portal.
- Seleccione Crear un recurso y busque Trabajo de Stream Analytics. Seleccione el icono Trabajo de Stream Analytics y haga clic en Crear.
- En la página Nuevo trabajo de Stream Analytics, siga estos pasos:
En Suscripción, seleccione la suscripción que contiene el espacio de nombres de Event Hubs.
En Grupo de recursos, seleccione el grupo de recursos que ha creado anteriormente.
En el campo Nombre de la sección Detalles de la instancia, escriba un nombre único para el trabajo de Stream Analytics.
En Región, seleccione la región en la que desea crear el trabajo de Stream Analytics. Para obtener un rendimiento óptimo, recomendamos ubicar el trabajo y el centro de eventos en la misma región de modo que no deba pagar por transferir datos de una región a otra.
En Entorno de hospedaje<, seleccione la opción Nube si aún no está seleccionada. Los trabajos de Stream Analytics se pueden implementar en la nube o en dispositivos perimetrales. Nube le permite implementar en Azure Cloud y Edge le permite implementar en un dispositivo IoT Edge.
En Unidades de streaming, seleccione 1. Las unidades de streaming representan los recursos informáticos que se necesitan para ejecutar un trabajo. De forma predeterminada, este valor se establece en 1. Para información sobre el escalado de unidades de streaming, consulte el artículo Descripción y ajuste de las unidades de streaming.
En la parte inferior de la página, seleccione Revisar y crear.
- En la página Revisar y crear, revise los valores y seleccione Crear para crear un trabajo de Stream Analytics.
- Tras la implementación del trabajo, seleccione Ir al recurso para ir a la página Trabajo de Stream Analytics.
Configuración de la entrada del trabajo
El siguiente paso es definir un origen de entrada para que el trabajo lea los datos mediante el centro de eventos que creó en la sección anterior.
En la página Trabajo de Stream Analytics, en la sección Topología de trabajo del menú izquierdo, seleccione Entradas.
En la página Entradas, seleccione +Agregar entrada y Centro de eventos.
En la página Centro de eventos, siga estos pasos:
En Alias de entrada, escriba Transmisión_de_llamada. El alias de entrada es un nombre descriptivo para identificar la entrada. El alias de entrada debe tener una longitud de entre 3 y 63 caracteres, y solo puede incluir caracteres alfanuméricos, guiones y guiones bajos.
En Suscripción, seleccione la suscripción de Azure en la que ha creado el centro de eventos. El centro de eventos puede estar en la misma suscripción que el trabajo de Stream Analytics, o en otra diferente.
En Espacio de nombres de Event Hubs, seleccione el espacio de nombres de Event Hubs que ha creado en la sección anterior. Todos los espacios de nombres disponibles en su suscripción actual se muestran en la lista desplegable.
En Nombre del centro de eventos, seleccione el centro de eventos que ha creado en la sección anterior. Todos los centros de eventos disponibles en el espacio de nombres seleccionado se muestran en la lista desplegable.
Para el grupo de consumidores del centro de eventos, mantenga seleccionada la opción Crear para que se cree un grupo de consumidores en el centro de eventos. Se recomienda usar un grupo de consumidores distinto para cada trabajo de Stream Analytics. Si no se especifica ningún grupo de consumidores, el trabajo de Stream Analytics usa el grupo de consumidores
$Default
. Cuando un trabajo contiene una autocombinación o tiene varias entradas, es posible que algunas entradas se lean luego por más de un lector. Esta situación afecta al número de lectores de un solo grupo de consumidores.En Modo de autenticación, seleccione Cadena de conexión. Es más fácil probar el tutorial con esta opción.
Para el nombre de directiva del centro de eventos, seleccione Usar existente y, después, seleccione la directiva que ha creado antes.
Seleccione Guardar en la parte inferior de la página.
Creación de una instancia de Azure Redis Cache
Cree una caché en Azure Redis Cache mediante los pasos descritos en Creación de una instancia de Azure Cache for Redis.
Después de crear la caché, en Configuración, seleccione Teclas de acceso. Anote la cadena de conexión principal.
Creación de una función en Azure Functions que pueda escribir datos en Azure Redis Cache
Consulte la sección Creación de una aplicación de función de la documentación de Functions. Este ejemplo se ha basado en:
Cree una aplicación de funciones HttpTrigger predeterminada en Visual Studio Code con este tutorial. Se va a usar la información siguiente: lenguaje:
C#
, runtime:.NET 6
(con Functions v4), plantilla:HTTP trigger
.Instale la biblioteca cliente de Redis mediante la ejecución del siguiente comando en un terminal ubicado en la carpeta del proyecto:
dotnet add package StackExchange.Redis --version 2.2.88
Agregue los elementos
RedisConnectionString
yRedisDatabaseIndex
en la secciónValues
delocal.settings.json
y rellene la cadena de conexión del servidor de destino:{ "IsEncrypted": false, "Values": { "AzureWebJobsStorage": "", "FUNCTIONS_WORKER_RUNTIME": "dotnet", "RedisConnectionString": "Your Redis Connection String", "RedisDatabaseIndex":"0" } }
El índice de base de datos de Redis es el número de 0 a 15 que identifica la base de datos en la instancia.
Reemplace toda la función (archivo .cs del proyecto) por el siguiente fragmento de código. Actualice el espacio de nombres, el nombre de clase y el nombre de función por los suyos:
using System; using System.IO; using System.Threading.Tasks; using Microsoft.AspNetCore.Mvc; using Microsoft.Azure.WebJobs; using Microsoft.Azure.WebJobs.Extensions.Http; using Microsoft.AspNetCore.Http; using Microsoft.Extensions.Logging; using Newtonsoft.Json; using StackExchange.Redis; namespace Company.Function { public static class HttpTrigger1{ [FunctionName("HttpTrigger1")] public static async Task<IActionResult> Run( [HttpTrigger(AuthorizationLevel.Function, "get","post", Route = null)] HttpRequest req, ILogger log) { // Extract the body from the request string requestBody = await new StreamReader(req.Body).ReadToEndAsync(); if (string.IsNullOrEmpty(requestBody)) {return new StatusCodeResult(204);} // 204, ASA connectivity check dynamic data = JsonConvert.DeserializeObject(requestBody); // Reject if too large, as per the doc if (data.ToString().Length > 262144) {return new StatusCodeResult(413);} //HttpStatusCode.RequestEntityTooLarge string RedisConnectionString = Environment.GetEnvironmentVariable("RedisConnectionString"); int RedisDatabaseIndex = int.Parse(Environment.GetEnvironmentVariable("RedisDatabaseIndex")); using (var connection = ConnectionMultiplexer.Connect(RedisConnectionString)) { // Connection refers to a property that returns a ConnectionMultiplexer IDatabase db = connection.GetDatabase(RedisDatabaseIndex); // Parse items and send to binding for (var i = 0; i < data.Count; i++) { string key = data[i].Time + " - " + data[i].CallingNum1; db.StringSet(key, data[i].ToString()); log.LogInformation($"Object put in database. Key is {key} and value is {data[i].ToString()}"); // Simple get of data types from the cache string value = db.StringGet(key); log.LogInformation($"Database got: {key} => {value}"); } } return new OkResult(); // 200 } } }
Cuando Stream Analytics recibe la excepción "La entidad de solicitud HTTP es demasiado grande" de la función, reduce el tamaño de los lotes que envía a las funciones. El código siguiente garantiza que Stream Analytics no envía lotes demasiado grandes. Asegúrese de que los valores de tamaño y número máximo de lotes que se usan en la función sean coherentes con los valores que se ingresan en el portal de Stream Analytics.
La función ya se puede publicar en Azure.
Abra la función en Azure Portal y establezca los valores de la aplicación de
RedisConnectionString
yRedisDatabaseIndex
.
Actualización del trabajo de Stream Analytics con la función como salida
En Azure Portal, abra el trabajo de Stream Analytics.
Vaya a la función y seleccione Información general>Salidas>Agregar. Para agregar una nueva salida, seleccione Función de Azure para la opción de receptor. El adaptador de salida de Functions tiene las propiedades siguientes:
Nombre de propiedad Descripción Alias de salida Nombre descriptivo que se usará en la consulta del trabajo para hacer referencia a la salida. Opción de importación Puede utilizar la función de la suscripción actual o proporcionar la configuración manualmente si la función se encuentra en otra suscripción. Function App Nombre de la aplicación de Functions Función Nombre de la función de la aplicación de Functions (nombre de la función run.csx). Tamaño máximo de lote Establece el tamaño máximo de cada lote de salida que se envía a la función, en bytes. De manera predeterminada, este valor se establece en 262 144 bytes (256 KB). Número máximo de lotes Especifica el número máximo de eventos de cada lote que se envía a la función. El valor predeterminado es 100. Esta propiedad es opcional. Clave Permite usar una función de otra suscripción. Proporcione el valor de clave para acceder a la función. Esta propiedad es opcional. Proporcione un nombre para el alias de salida. En este tutorial, se denomina saop1, pero puede usar el nombre que prefiera. Rellene otros detalles.
Abra el trabajo de Stream Analytics y actualice la consulta a lo siguiente.
Importante
En el siguiente script de ejemplo se supone que usó CallStream para el nombre de entrada y saop1 para el nombre de salida. Si ha usado nombres diferentes, NO olvide actualizar la consulta.
SELECT System.Timestamp as Time, CS1.CallingIMSI, CS1.CallingNum as CallingNum1, CS2.CallingNum as CallingNum2, CS1.SwitchNum as Switch1, CS2.SwitchNum as Switch2 INTO saop1 FROM CallStream CS1 TIMESTAMP BY CallRecTime JOIN CallStream CS2 TIMESTAMP BY CallRecTime ON CS1.CallingIMSI = CS2.CallingIMSI AND DATEDIFF(ss, CS1, CS2) BETWEEN 1 AND 5 WHERE CS1.SwitchNum != CS2.SwitchNum
Inicie la aplicación telcodatagen.exe ejecutando el comando siguiente en la línea de comandos. El comando usa el formato
telcodatagen.exe [#NumCDRsPerHour] [SIM Card Fraud Probability] [#DurationHours]
.telcodatagen.exe 1000 0.2 2
Inicie el trabajo de Stream Analytics.
En la página Supervisión de la función de Azure, verá que se invoca la función.
En la página Azure Cache for Redis, seleccione Métricas en el menú de la izquierda, agregue la métrica Escritura de caché y establezca la duración en la última hora. Verá un gráfico similar al que se muestra en la imagen siguiente.
Búsqueda de resultados en Azure Redis Cache
Obtener la clave de los registros de Azure Functions
En primer lugar, obtenga la clave de un registro insertado en Azure Cache for Redis. En el código, la clave se calcula en la función de Azure, tal como se muestra en el fragmento de código siguiente:
string key = data[i].Time + " - " + data[i].CallingNum1;
db.StringSet(key, data[i].ToString());
log.LogInformation($"Object put in database. Key is {key} and value is {data[i].ToString()}");
Vaya a Azure Portal y busque la aplicación Azure Functions.
Seleccione Funciones en el menú de la izquierda.
En la lista de funciones, seleccione HTTPTrigger1.
En el menú de la izquierda, seleccione Supervisión.
Cambie a la pestaña Registros.
Anote una clave del mensaje informativo, tal como se muestra en la captura de pantalla siguiente. Use esta clave para buscar el valor en Azure Cache for Redis.
Usar la clave para buscar el registro en Azure Cache for Redis
Vaya a Azure Portal y encuentre su instancia de Azure Redis Cache. Seleccione Consola.
Use Azure Redis Cache para comprobar que los datos están Azure Redis Cache. (El comando toma el formato Get {key}).) Use la clave que ha copiado de los registros de Supervisión de la función de Azure (en la sección anterior).
Obtener "KEY-FROM-THE-PREVIOUS-SECTION"
Este comando debe imprimir el valor de la clave especificada:
Control de errores y reintentos
Si se produce algún error al enviar eventos a Azure Functions, Stream Analytics reintentará la mayoría de las operaciones. Todas las excepciones HTTP se reintentan hasta que se ejecutan correctamente con la excepción del error HTTP 413 (la entidad es demasiado grande). Un error de que una entidad demasiado grande se trata como un error de datos, que está sujeto a la directiva de reintento o de eliminación.
Nota
El tiempo de espera de las solicitudes HTTP de Stream Analytics a Azure Functions se establece en 100 segundos. Si la aplicación de Azure Functions tarda más de 100 segundos en procesar un lote, Stream Analytics genera un error y vuelve a intentarlo.
El reintento de tiempos de espera podría dar lugar a eventos duplicados escritos en el receptor de salida. Cuando Stream Analytics reintenta un lote con errores, incluye todos los eventos del lote. Por ejemplo, considere un lote de 20 eventos que se envían a Azure Functions desde Stream Analytics. Supongamos que Azure Functions tarda 100 segundos en procesar los primeros 10 eventos de ese lote. Tras 100 segundos, Stream Analytics suspende la solicitud porque no ha recibido una respuesta positiva de Azure Functions y se envía otra solicitud para el mismo lote. Azure Functions procesa los primeros 10 eventos del lote de nuevo, por lo que se duplican.
Problemas conocidos
En Azure Portal, al intentar restablecer el valor de Tamaño máximo de lotes o Número máximo de lotes en valor vacío (predeterminado), el valor vuelve a cambiar al valor especificado anteriormente al guardar. Especifique manualmente los valores predeterminados para estos campos en este caso.
El uso de enrutamiento HTTP en Azure Functions actualmente no es compatible con Stream Analytics.
No está habilitada la compatibilidad con la conexión a Azure Functions hospedada en una red virtual.
Limpieza de recursos
Cuando no los necesite, elimine el grupo de recursos, el trabajo de streaming y todos los recursos relacionados. La eliminación del trabajo evita la facturación de las unidades de streaming utilizadas por el trabajo. Si piensa utilizar el trabajo en el futuro, puede detenerlo y volver a iniciarlo más adelante cuando sea necesario. Si no va a seguir usando este trabajo, siga estos pasos para eliminar todos los recursos creados en este inicio rápido:
- En el menú de la izquierda de Azure Portal, seleccione Grupos de recursos y luego el nombre del recurso que creó.
- En la página del grupo de recursos, seleccione Eliminar, escriba el nombre del recurso que quiere eliminar en el cuadro de texto y, luego, seleccione Eliminar.
Pasos siguientes
En este tutorial, ha creado un trabajo de Stream Analytics simple que ejecuta una instancia de Azure Functions. Para más información sobre los trabajos de Stream Analytics, continúe con el tutorial siguiente: