Поделиться через


Анализ данных JSON и AVRO в Azure Stream Analytics

Azure Stream Analytics поддерживает обработку событий в форматах CSV, JSON и Avro. Данные JSON и Avro могут быть структурированными и содержать сложные типы, такие как вложенные объекты (записи) и массивы.

Примечание

Файлы AVRO, созданные функцией записи концентратора событий, используют конкретный формат, который требует функцию пользовательского десериализатора. Дополнительные сведения см. в разделе Чтение входных данных в любом формате с помощью пользовательских десериализаторов .NET.

Тип данных "запись"

Тип данных "запись" используется для представления массивов JSON и Avro, когда соответствующие форматы используются во входных потоках данных. Эти примеры демонстрируют пример датчика, который считывает входные события в формате JSON. Ниже приведен пример одного события:

{
    "DeviceId" : "12345",
    "Location" :
    {
        "Lat": 47,
        "Long": 122
    },
    "SensorReadings" :
    {
        "Temperature" : 80,
        "Humidity" : 70,
        "CustomSensor01" : 5,
        "CustomSensor02" : 99,
        "SensorMetadata" : 
        {
        "Manufacturer":"ABC",
        "Version":"1.2.45"
        }
    }
}

Доступ к вложенным полям в известной схеме

Используйте точечную нотацию (.) для простого доступа к вложенным полям непосредственно из запроса. Например, этот запрос выбирает координаты широты и долготы в свойстве Location из предыдущего фрагмента данных JSON. Точечную нотацию можно использовать для навигации на нескольких уровнях, как показано ниже.

SELECT
    DeviceID,
    Location.Lat,
    Location.Long,
    SensorReadings.Temperature,
    SensorReadings.SensorMetadata.Version
FROM input

Результат:

DeviceID Lat Long температура; Версия
12345 47 122 80 1.2.45

Выбор всех свойств

Все свойства вложенной записи можно выбрать с помощью подстановочного знака "*". Рассмотрим следующий пример:

SELECT
    DeviceID,
    Location.*
FROM input

Результат:

DeviceID Lat Long
12345 47 122

Доступ к вложенным полям, если имя свойства является переменной

Используйте функцию GetRecordPropertyValue, если имя свойства является переменной. Это позволяет создавать динамические запросы без заданных имен свойств.

Например, представьте пример потока данных, который нужно соединить с эталонными данными, содержащими пороговые значения для каждого датчика устройства. Ниже приведен фрагмент кода с такими эталонными данными.

{
    "DeviceId" : "12345",
    "SensorName" : "Temperature",
    "Value" : 85
},
{
    "DeviceId" : "12345",
    "SensorName" : "Humidity",
    "Value" : 65
}

Цель этого примера — присоединить наш пример набора данных из верхней части статьи к этим эталонным данным и вывести одно событие для каждой меры датчика выше порогового значения. Это означает, что с помощью объединения одно событие может создать несколько выходных событий, если несколько датчиков превысили пороги. Чтобы получить аналогичные результаты без объединения, см. раздел ниже.

SELECT
    input.DeviceID,
    thresholds.SensorName,
    "Alert: Sensor above threshold" AS AlertMessage
FROM input      -- stream input
JOIN thresholds -- reference data input
ON
    input.DeviceId = thresholds.DeviceId
WHERE
    GetRecordPropertyValue(input.SensorReadings, thresholds.SensorName) > thresholds.Value

GetRecordPropertyValue выбирает свойство в SensorReadings, имя которого совпадает с именем свойства, поступающего от эталонных данных. Затем извлекается связанное значение из SensorReadings.

Результат:

DeviceID SensorName AlertMessage
12345 влажность. Предупреждение. Датчик, превышающий пороговое значение

Преобразование полей записей в отдельные события

Чтобы преобразовать поля записей в отдельные события, используйте оператор APPLY вместе с функцией GetRecordProperties.

С исходными примерами данных можно использовать следующий запрос для извлечения свойств в различные события.

SELECT
    event.DeviceID,
    sensorReading.PropertyName,
    sensorReading.PropertyValue
FROM input as event
CROSS APPLY GetRecordProperties(event.SensorReadings) AS sensorReading

Результат:

DeviceID SensorName AlertMessage
12345 температура; 80
12345 влажность. 70
12345 CustomSensor01 5
12345 CustomSensor02 99
12345 SensorMetadata [object Object]

Используя WITH, можно перенаправлять эти события в разные места назначения:

WITH Stage0 AS
(
    SELECT
        event.DeviceID,
        sensorReading.PropertyName,
        sensorReading.PropertyValue
    FROM input as event
    CROSS APPLY GetRecordProperties(event.SensorReadings) AS sensorReading
)

SELECT DeviceID, PropertyValue AS Temperature INTO TemperatureOutput FROM Stage0 WHERE PropertyName = 'Temperature'
SELECT DeviceID, PropertyValue AS Humidity INTO HumidityOutput FROM Stage0 WHERE PropertyName = 'Humidity'

Анализ записи JSON в эталонных данных SQL

При использовании базы данных SQL Azure в качестве эталонных данных в задании может существовать столбец, содержащий данные в формате JSON. Ниже приведен пример такого файла.

DeviceID Данные
12345 {"key": "value1"}
54321 {"key": "value2"}

Можно выполнить синтаксический анализ записи JSON в столбце Данные, написав простую определяемую пользователем функцию JavaScript.

function parseJson(string) {
return JSON.parse(string);
}

Затем можно создать шаг в запросе Stream Analytics, как показано ниже, чтобы получить доступ к полям записей JSON.

WITH parseJson as
(
SELECT DeviceID, udf.parseJson(sqlRefInput.Data) as metadata,
FROM sqlRefInput
)

SELECT metadata.key
INTO output
FROM streamInput
JOIN parseJson 
ON streamInput.DeviceID = parseJson.DeviceID

Тип данных "массив"

Тип данных "массив" представляет собой упорядоченную коллекцию значений. Ниже приведены типичные операции со значениями массивов. В этих примерах используются функции GetArrayElement, GetArrayElements, GetArrayLength и оператор APPLY.

Ниже приведен пример события. CustomSensor03 и SensorMetadata принадлежат к типу array:

{
    "DeviceId" : "12345",
    "SensorReadings" :
    {
        "Temperature" : 80,
        "Humidity" : 70,
        "CustomSensor01" : 5,
        "CustomSensor02" : 99,
        "CustomSensor03": [12,-5,0]
     },
    "SensorMetadata":[
        {          
            "smKey":"Manufacturer",
            "smValue":"ABC"                
        },
        {
            "smKey":"Version",
            "smValue":"1.2.45"
        }
    ]
}

Работа с конкретным элементом массива

Выберите элемент массива по указанному индексу (первый элемент массива):

SELECT
    GetArrayElement(SensorReadings.CustomSensor03, 0) AS firstElement
FROM input

Результат:

firstElement
12

Выбор длины массива

SELECT
    GetArrayLength(SensorReadings.CustomSensor03) AS arrayLength
FROM input

Результат:

arrayLength
3

Преобразование элементов массива в отдельные события

Выберите все элементы массива как отдельные события. Оператор APPLY вместе со встроенной функцией GetArrayElements извлекает все элементы массива как отдельные события:

SELECT
    DeviceId,
	CustomSensor03Record.ArrayIndex,
	CustomSensor03Record.ArrayValue
FROM input
CROSS APPLY GetArrayElements(SensorReadings.CustomSensor03) AS CustomSensor03Record

Результат:

deviceId ArrayIndex ArrayValue
12345 0 12
12345 1 -5
12345 2 0
SELECT   
    i.DeviceId,	
    SensorMetadataRecords.ArrayValue.smKey as smKey,
    SensorMetadataRecords.ArrayValue.smValue as smValue
FROM input i
CROSS APPLY GetArrayElements(SensorMetadata) AS SensorMetadataRecords

Результат:

deviceId smKey smValue
12345 Производитель ABC
12345 Версия 1.2.45

Если извлеченные поля должны появиться в столбцах, можно выполнить сведение набора данных с помощью синтаксиса WITH в дополнение к операции JOIN. Для этого соединения требуется условие границы времени , которое предотвращает дублирование:

WITH DynamicCTE AS (
	SELECT   
		i.DeviceId,
		SensorMetadataRecords.ArrayValue.smKey as smKey,
		SensorMetadataRecords.ArrayValue.smValue as smValue
	FROM input i
	CROSS APPLY GetArrayElements(SensorMetadata) AS SensorMetadataRecords 
)

SELECT
	i.DeviceId,
	i.Location.*,
	V.smValue AS 'smVersion',
	M.smValue AS 'smManufacturer'
FROM input i
LEFT JOIN DynamicCTE V ON V.smKey = 'Version' and V.DeviceId = i.DeviceId AND DATEDIFF(minute,i,V) BETWEEN 0 AND 0 
LEFT JOIN DynamicCTE M ON M.smKey = 'Manufacturer' and M.DeviceId = i.DeviceId AND DATEDIFF(minute,i,M) BETWEEN 0 AND 0

Результат:

deviceId Lat Long smVersion smManufacturer
12345 47 122 1.2.45 ABC

См. также:

Типы данных в Azure Stream Analytics