Compartir a través de


Datos de flujo como entrada en Stream Analytics

Stream Analytics presenta una integración de primera clase con flujos de datos de Azure como entradas de cuatro tipos de recursos:

Estos recursos de entrada pueden proceder de la misma suscripción de Azure que el trabajo de Stream Analytics o de otra suscripción.

Compresión

Stream Analytics admite la compresión para todos los orígenes de entrada. Los tipos de compresión admitidos son: Ninguna, Gzip y Deflate. La compatibilidad con la compresión no está disponible para los datos de referencia. Si los datos de entrada fueran datos de Avro comprimidos, Stream Analytics los manipulará de forma transparente. No es necesario especificar el tipo de compresión con la serialización de Avro.

Creación, edición o prueba de entradas

Puede usar Azure Portal, Visual Studio, y Visual Studio Code para agregar, ver o editar entradas existentes en su trabajo de streaming. También puede probar conexiones de entrada y probar consultas a partir de datos de ejemplo desde Azure Portal, Visual Studio y Visual Studio Code. Al escribir una consulta, se muestra una lista de las entradas en la cláusula FROM. Puede obtener la lista de las entradas disponibles en la página Consulta del portal. Si quisiera usar varias entradas, use JOIN o escriba varias consultas SELECT.

Nota:

Se recomienda encarecidamente usar herramientas de Stream Analytics para Visual Studio Code para obtener la mejor experiencia de desarrollo local. Existen lagunas conocidas en las herramientas de Stream Analytics para Visual Studio 2019 (versión 2.6.3000.0) y no se mejorarán en el futuro.

Datos de flujo de Event Hubs

Azure Event Hubs es un agente de ingesta de eventos de publicación-suscripción altamente escalable. Un centro de eventos puede recopilar millones de eventos por segundo, por lo que se pueden procesar y analizar las grandes cantidades de datos que generan las aplicaciones y los dispositivos conectados. Juntos, Event Hubs y Stream Analytics proporcionan una solución de un extremo a otro para realizar análisis en tiempo real. Event Hubs le permite suministrar eventos a Azure en tiempo real y los trabajos de Stream Analytics pueden procesarlos también en tiempo real. Por ejemplo, se pueden enviar clics de web, lecturas de sensor o eventos de registro en línea a Event Hubs. Así, se pueden crear trabajos de Stream Analytics que usen Event Hubs para los datos de entrada para su filtrado, agregación y correlación en tiempo real.

EventEnqueuedUtcTime es la marca de tiempo de la llegada de un evento a un centro de eventos y es la marca de tiempo predeterminada de los eventos procedentes de Event Hubs hacia Stream Analytics. Para procesar los datos como un flujo con una marca de tiempo en la carga del evento, se debe usar la palabra clave TIMESTAMP BY.

Grupo de consumidores de Event Hubs

Se debería configurar cada entrada del centro de eventos para que tenga su propio grupo de consumidores. Cuando un trabajo contiene una autocombinación o tiene varias entradas, es posible que algunas entradas sean leídas por más de un lector de un nivel inferior. Esta situación afecta al número de lectores de un solo grupo de consumidores. El procedimiento recomendado para evitar superar el límite de cinco lectores por grupo de consumidores por cada partición de Event Hubs consiste en designar un grupo de consumidores para cada trabajo de Stream Analytics. También hay un límite de 20 grupos de consumidores para los centros de eventos de nivel estándar. Para obtener más información, consulte Troubleshoot Azure Stream Analytics inputs (Solución de problemas de las entradas de Azure Stream Analytics).

Creación de una entrada desde Event Hubs

En la siguiente tabla se explica cada propiedad de la página Nueva entrada de Azure Portal para transmitir la entrada de datos desde un centro de eventos:

Propiedad Descripción
Alias de entrada Nombre descriptivo que se usará en la consulta del trabajo para hacer referencia a esta entrada.
Suscripción Elija la suscripción de Azure en la que exista el recurso del centro de eventos.
Espacio de nombres del centro de eventos El espacio de nombres de Event Hubs es un contenedor del centros de eventos. Al crear un centro de eventos, también se crea el espacio de nombres.
Nombre del centro de eventos Nombre del centro de eventos que se usa como entrada.
Grupo de consumidores del centro de eventos (recomendado) Se recomienda usar un grupo de consumidores distinto para cada trabajo de Stream Analytics. Esta cadena identifica el grupo de consumidores que se usa para la ingesta de datos desde el centro de eventos. Si no se especifica ningún grupo de consumidores, el trabajo de Stream Analytics usa el grupo de consumidores $Default.
Modo de autenticación Especifique el tipo de autenticación que desee usar para conectarse al centro de eventos. Es posible usar una cadena de conexión o una identidad administrada para autenticarse con el centro de eventos. Para la opción de identidad administrada, podría crear una identidad administrada asignada por el sistema para el trabajo de Stream Analytics o una identidad administrada asignada por el usuario para autenticarse con el centro de eventos. Cuando se usa una identidad administrada, esta debe ser miembro de los roles de propietario de datos de Azure Event Hubs o del receptor de datos de Azure Event Hubs.
Nombre de la directiva del centro de eventos Directiva de acceso compartido que proporciona acceso a la instancia de Event Hubs. Cada directiva de acceso compartido tiene un nombre, los permisos establecidos y las claves de acceso. Esta opción se rellena automáticamente, a menos que seleccione la opción para proporcionar la configuración de Event Hubs de forma manual.
Clave de partición Es un campo opcional que solo está disponible si el trabajo está configurado para usar el nivel de compatibilidad 1.2 o cualquier nivel superior. Si la entrada está particionada por una propiedad, puede agregar aquí el nombre de esta propiedad. Se usa para mejorar el rendimiento de la consulta si incluye una cláusula PARTITION BY o GROUP BY en esta propiedad. Si este trabajo usa el nivel de compatibilidad 1.2 o superior, el valor predeterminado de este campo será PartitionId.
Formato de serialización de eventos El formato de serialización (JSON, CSV, Avro, Parquet u otro [Protobuf, XML, propietario, etc.]) del flujo de datos de entrada. Asegúrese de que el formato JSON responde a la especificación y no incluye un 0 inicial para números decimales.
Encoding Por el momento, UTF-8 es el único formato de codificación compatible.
Tipo de compresión de eventos Tipo de compresión utilizado para leer el flujo de datos entrante, como Ninguno (valor predeterminado), Gzip o Deflate.
Registro de esquema (versión preliminar) Seleccione el registro de esquema con esquemas para los datos de eventos que se reciban del centro de eventos.

Cuando los datos proceden de una entrada de flujo de Event Hubs, tiene acceso a los siguientes campos de metadatos en la consulta de Stream Analytics:

Propiedad. Descripción
EventProcessedUtcTime Fecha y hora en que Stream Analytics procesa el evento.
EventEnqueuedUtcTime Fecha y hora en que Event Hubs recibe los eventos.
PartitionId Identificador de partición de base cero para el adaptador de entrada.

Por ejemplo, si usa estos campos, puede escribir una consulta similar al ejemplo siguiente:

SELECT
    EventProcessedUtcTime,
    EventEnqueuedUtcTime,
    PartitionId
FROM Input

Nota:

Si se usa Event Hubs como punto de conexión para las rutas del centro de IoT, será posible acceder a los metadatos del centro de IoT mediante la función GetMetadataPropertyValue.

Datos de flujo de IoT Hub

Azure IoT Hub es un agente de ingesta de eventos de suscripción y publicación muy escalable optimizado para escenarios de IoT.

La marca de tiempo predeterminada de los eventos procedentes de IoT Hub en Stream Analytics es la marca de tiempo correspondiente al momento en que el evento llega a IoT Hub, que es EventEnqueuedUtcTime. Para procesar los datos como un flujo con una marca de tiempo en la carga del evento, se debe usar la palabra clave TIMESTAMP BY.

Grupo de consumidores de IoT Hub

Se debe configurar cada entrada de IoT Hub de Stream Analytics para que tenga su propio grupo de consumidores. Cuando un trabajo contiene una autocombinación o tiene varias entradas, es posible que alguna entrada pueda leerla más de un lector de bajada. Esta situación afecta al número de lectores de un solo grupo de consumidores. El procedimiento recomendado para evitar superar el límite de cinco lectores por grupo de consumidores por cada partición de Azure IoT Hub consiste en designar un grupo de consumidores para cada trabajo de Stream Analytics.

Configuración de una instancia de IoT Hub como entrada de flujo de datos

En la siguiente tabla se explica cada propiedad de la página Nueva entrada de Azure Portal cuando se configura IoT Hub como entrada de flujo.

Propiedad Descripción
Alias de entrada Nombre descriptivo que se usará en la consulta del trabajo para hacer referencia a esta entrada.
Suscripción Elija la suscripción en la que se encuentra el recurso de IoT Hub.
IoT Hub Nombre de la instancia de IoT Hub que se usa como entrada.
Grupo de consumidores Se recomienda usar un grupo de consumidores distinto para cada trabajo de Stream Analytics. El grupo de consumidores que se usa para ingerir datos desde Azure IoT Hub. Stream Analytics usa el grupo de consumidores $Default, a menos que se especifique lo contrario.
Nombre de directiva de acceso compartido Directiva de acceso compartido que proporciona acceso a IoT Hub. Cada directiva de acceso compartido tiene un nombre, los permisos establecidos y las claves de acceso.
Clave de directiva de acceso compartido Clave de acceso compartido que se usa para autorizar el acceso a IoT Hub. Esta opción se rellena automáticamente, a menos que elija proporcionar la configuración de IoT Hub manualmente.
Punto de conexión Punto de conexión de IoT Hub.
Clave de partición Es un campo opcional que solo está disponible si el trabajo está configurado para usar el nivel de compatibilidad 1.2 o cualquier nivel superior. Si la entrada está particionada por una propiedad, puede agregar aquí el nombre de esta propiedad. Se usa para mejorar el rendimiento de la consulta si incluye una cláusula PARTITION BY o GROUP BY en esta propiedad. Si este trabajo usase el nivel de compatibilidad 1.2 o superior, el valor predeterminado de este campo será "PartitionId".
Formato de serialización de eventos El formato de serialización (JSON, CSV, Avro, Parquet u otro [Protobuf, XML, propietario, etc.]) del flujo de datos de entrada. Asegúrese de que el formato JSON responde a la especificación y no incluye un 0 inicial para números decimales.
Encoding Por el momento, UTF-8 es el único formato de codificación compatible.
Tipo de compresión de eventos Tipo de compresión utilizado para leer el flujo de datos entrante, como Ninguno (valor predeterminado), Gzip o Deflate.

Cuando usa datos de flujo de IoT Hub, puede acceder a los siguientes campos de metadatos en la consulta de Stream Analytics:

Propiedad Descripción
EventProcessedUtcTime Fecha y la hora en que se produjo el evento.
EventEnqueuedUtcTime Fecha y hora en que el centro de IoT recibe el evento.
PartitionId Identificador de partición de base cero para el adaptador de entrada.
IoTHub.MessageId Identificador que sirve para correlacionar la comunicación bidireccional en IoT Hub.
IoTHub.CorrelationId Identificador que se usa en las respuestas a mensajes y en los comentarios en IoT Hub.
IoTHub.ConnectionDeviceId Identificador de autenticación que se usa para enviar este mensaje. Este valor lo marca el centro de IoT en los mensajes enlazados al servicio.
IoTHub.ConnectionDeviceGenerationId Identificador de generación del dispositivo autenticado que se ha usado para enviar este mensaje. Este valor lo marca IoT Hub en los mensajes servicebound.
IoTHub.EnqueuedTime Hora en la que el centro de IoT recibe el mensaje.

Transmisión de datos de Blob Storage o Data Lake Storage Gen2

En escenarios con grandes cantidades de datos no estructurados para almacenar en la nube, Azure Blob Storage o Azure Data Lake Storage Gen2 ofrecen una solución rentable y escalable. Los datos de Blob Storage o Azure Data Lake Storage Gen2 se consideran datos en reposo. Sin embargo, Stream Analytics puede procesar estos datos como flujo de datos.

Un escenario que se usa normalmente para utilizar tales entradas con Stream Analytics es el procesamiento de registros. En este escenario, se capturan archivos de datos de telemetría de un sistema y es preciso analizarlos y procesarlos para extraer datos significativos.

La marca de tiempo predeterminada de un evento de Blob Storage o Azure Data Lake Storage Gen2 en Stream Analytics es la marca de tiempo en que se modificó por última vez, que es BlobLastModifiedUtcTime. Si se carga un blob en una cuenta de almacenamiento a las 13:00 y el trabajo de Azure Stream Analytics se inicia con la opción Now (Ahora) a las 13:01, el blob no se seleccionará, ya que su hora de modificación está fuera del período de ejecución del trabajo.

Si se carga un blob en un contenedor de cuenta de almacenamiento a las 13:00 y el trabajo Azure Stream Analytics se inicia con la opción Hora personalizada a las 13:00 o antes, el blob se recogerá, ya que su hora de modificación está dentro del período de ejecución del trabajo.

Si se iniciase un trabajo de Azure Stream Analytics mediante la opción Ahora a las 13:00 y se cargase un blob en el contenedor de la cuenta de almacenamiento a las 13:01, Azure Stream Analytics recogerá dicho blob. La marca de tiempo asignada a cada blob se basa únicamente en BlobLastModifiedTime. La carpeta en la que se encuentra el blob no guarda ninguna relación con la marca de tiempo asignada. Por ejemplo, si hubiera un blob 2019/10-01/00/b1.txt con un BlobLastModifiedTime de 2019-11-11, la marca de tiempo asignada a este blob sería 2019-11-11.

Para procesar los datos como un flujo con una marca de tiempo en la carga del evento, se debe usar la palabra clave TIMESTAMP BY. Un trabajo de Stream Analytics extrae datos de entrada de Azure Blob Storage o Azure Data Lake Storage Gen2 cada segundo si el archivo de blob está disponible. Si el archivo de blob no está disponible, hay un retroceso exponencial con un retraso de tiempo máximo de 90 segundos.

Nota:

Stream Analytics no permite agregar contenido a un archivo de blob existente. Stream Analytics solo verá cada archivo una vez y los cambios que se produzcan en él después de que el trabajo lea los datos no se procesan. Se recomienda cargar todos los datos de un archivo de blob a la vez y, a continuación, agregar los eventos más recientes a un archivo de blob diferente y nuevo.

En aquellos escenarios en los que se agregan continuamente muchos blobs que Stream Analytics procesa a medida que se agregan, es posible que se omitan algunos blobs en raras ocasiones debido a la granularidad del objeto BlobLastModifiedTime. Esto se puede mitigar cargando los blobs con al menos dos segundos de diferencia. Si esta opción no es factible, se puede usar Event Hubs para transmitir grandes volúmenes de eventos.

Configuración de Blob Storage como entrada de flujo

En la siguiente tabla se explica cada propiedad de la página Nueva entrada de Azure Portal cuando se configura Blob Storage como entrada de flujo.

Propiedad Descripción
Alias de entrada Nombre descriptivo que se usará en la consulta del trabajo para hacer referencia a esta entrada.
Suscripción Elija la suscripción en la que se encuentra el recurso de almacenamiento.
Cuenta de almacenamiento Nombre de la cuenta de almacenamiento donde se encuentran los archivos de blob.
Clave de cuenta de almacenamiento La clave secreta asociada con la cuenta de almacenamiento. Esta opción se rellena automáticamente, a menos que elija proporcionar la configuración de forma manual.
Contenedor Los contenedores proporcionan una agrupación lógica de los blobs. Puede elegir el contenedor Usar existente o Crear nuevo para crear un contenedor.
Modo de autenticación Especifique el tipo de autenticación que desea usar para conectarse a la cuenta de almacenamiento. Es posible usar una cadena de conexión o una identidad administrada para autenticarse con la cuenta de almacenamiento. Para la opción de identidad administrada, podría crear una identidad administrada asignada por el sistema al trabajo de Stream Analytics o una identidad administrada asignada por el usuario para autenticarse con la cuenta de almacenamiento. Cuando se usa una identidad administrada, la identidad administrada debe ser miembro de un rol adecuado de la cuenta de almacenamiento.
Patrón de ruta de acceso (opcional) Ruta de acceso de archivo que sirve para ubicar los blobs dentro del contenedor especificado. Si desea leer los blobs de la raíz del contenedor, no establezca un patrón de ruta de acceso. Dentro de la ruta, puede especificar una o más instancias de las tres variables siguientes: {date}, {time} o {partition}.

Ejemplo 1: cluster1/logs/{date}/{time}/{partition}

Ejemplo 2: cluster1/logs/{date}

El carácter * no es un valor permitido para el prefijo de ruta de acceso. Solo se permiten caracteres de Blob de Azure. No incluya nombres de contenedor ni nombres de archivo.
Formato de fecha (opcional) Si usa la variable de fecha en la ruta, formato de fecha por el que se organizan los archivos. Ejemplo: YYYY/MM/DD

Cuando la entrada de blob incluye {date} o {time} en su ruta de acceso, las carpetas se examinan en orden temporal ascendente.
Formato de hora (opcional) Si usa la variable de hora en la ruta, formato de hora por el que se organizan los archivos. Actualmente, el único valor admitido es HH para las horas.
Clave de partición Es un campo opcional que solo está disponible si el trabajo está configurado para usar el nivel de compatibilidad 1.2 o cualquier nivel superior. Si la entrada está particionada por una propiedad, puede agregar aquí el nombre de esta propiedad. Se usa para mejorar el rendimiento de la consulta si incluye una cláusula PARTITION BY o GROUP BY en esta propiedad. Si este trabajo usase el nivel de compatibilidad 1.2 o superior, el valor predeterminado de este campo será "PartitionId".
Count of input partitions (Recuento de particiones de entrada) Este campo solo está presente cuando {partition} existe en el patrón de ruta de acceso. El valor de esta propiedad es un entero > = 1. Siempre que {partition} aparezca en pathPattern, se usará un número entre 0 y el valor de este campo -1.
Formato de serialización de eventos El formato de serialización (JSON, CSV, Avro, Parquet u otro [Protobuf, XML, propietario, etc.]) del flujo de datos de entrada. Asegúrese de que el formato JSON responde a la especificación y no incluye un 0 inicial para números decimales.
Encoding Por el momento, UTF-8 es el único formato de codificación compatible para CSV y JSON.
Compresión Tipo de compresión utilizado para leer el flujo de datos entrante, como Ninguno (valor predeterminado), Gzip o Deflate.

Cuando los datos proceden de un origen de Blob Storage, puede acceder a los siguientes campos de metadatos en la consulta de Stream Analytics:

Propiedad Descripción
BlobName Nombre del blob de entrada de donde procede el evento.
EventProcessedUtcTime Fecha y hora en que Stream Analytics procesa el evento.
BlobLastModifiedUtcTime Fecha y la hora en que se modificó por última vez el blob.
PartitionId Identificador de partición de base cero para el adaptador de entrada.

Por ejemplo, si usa estos campos, puede escribir una consulta similar al ejemplo siguiente:

SELECT
    BlobName,
    EventProcessedUtcTime,
    BlobLastModifiedUtcTime
FROM Input

Transmisión de datos desde Apache Kafka

Azure Stream Analytics permite conectarse directamente a clústeres de Apache Kafka para ingerir datos. La solución es de poco código y está totalmente administrada por el equipo de Azure Stream Analytics en Microsoft, lo que le permite cumplir los estándares de cumplimiento empresarial. La entrada de Kafka es compatible con versiones anteriores y admite todas las versiones con la versión de cliente más reciente a partir de la versión 0.10. Los usuarios pueden conectarse a clústeres de Kafka dentro de una red virtual y a clústeres de Kafka con un punto de conexión público, en función de la configuración. La configuración se basa en las convenciones de configuración de Kafka existentes. Los tipos de compresión admitidos son None, Gzip, Snappy, LZ4 y Zstd.

Para obtener más información, consulte Transmisión de datos de Kafka a Azure Stream Analytics (versión preliminar).

Pasos siguientes