Поделиться через


Анализ данных JSON и AVRO в Azure Stream Analytics

Служба Azure Stream Analytics поддерживает обработку событий в форматах данных CSV, JSON и Avro. Данные JSON и Avro могут быть структурированными и содержать сложные типы, такие как вложенные объекты (записи) и массивы.

Примечание.

Файлы AVRO, созданные в Центрах событий, используют определенный формат, который требует использования пользовательской функции десериализатора . Дополнительные сведения см. в разделе Чтение входных данных в любом формате с помощью пользовательских десериализаторов .NET.

Тип данных "запись"

Тип данных "запись" используется для представления массивов JSON и Avro, когда соответствующие форматы используются во входных потоках данных. Эти примеры демонстрируют пример датчика, который считывает входные события в формате JSON. Ниже приведен пример одного события:

{
    "DeviceId" : "12345",
    "Location" :
    {
        "Lat": 47,
        "Long": 122
    },
    "SensorReadings" :
    {
        "Temperature" : 80,
        "Humidity" : 70,
        "CustomSensor01" : 5,
        "CustomSensor02" : 99,
        "SensorMetadata" : 
        {
        "Manufacturer":"ABC",
        "Version":"1.2.45"
        }
    }
}

Доступ к вложенным полям в известной схеме

Используйте точечную нотацию (.) для простого доступа к вложенным полям непосредственно из запроса. Например, этот запрос выбирает координаты широты и долготы в свойстве Location из предыдущего фрагмента данных JSON. Нотация точек может использоваться для навигации по нескольким уровням, как показано в следующем фрагменте кода:

SELECT
    DeviceID,
    Location.Lat,
    Location.Long,
    SensorReadings.Temperature,
    SensorReadings.SensorMetadata.Version
FROM input

Результат:

DeviceID Шир. Long Температура Версия
12345 47 122 80 1.2.45

Выбор всех свойств

Все свойства вложенной записи можно выбрать с помощью подстановочного знака "*". Рассмотрим следующий пример:

SELECT
    DeviceID,
    Location.*
FROM input

Результат:

DeviceID Шир. Long
12345 47 122

Доступ к вложенным полям, если имя свойства является переменной

Используйте функцию GetRecordPropertyValue, если имя свойства является переменной. Он позволяет создавать динамические запросы без жесткого обозначения имен свойств.

Например, представьте пример потока данных, который нужно соединить с эталонными данными, содержащими пороговые значения для каждого датчика устройства. Фрагмент таких ссылочных данных показан в следующем фрагменте кода.

{
    "DeviceId" : "12345",
    "SensorName" : "Temperature",
    "Value" : 85
},
{
    "DeviceId" : "12345",
    "SensorName" : "Humidity",
    "Value" : 65
}

Цель состоит в том, чтобы присоединить наш пример набора данных из верхней части статьи к этим эталонным данным и вывести одно событие для каждой меры датчика выше порогового значения. Это означает, что с помощью объединения одно событие может создать несколько выходных событий, если несколько датчиков превысили пороги. Чтобы добиться аналогичных результатов без соединения, см. следующий пример:

SELECT
    input.DeviceID,
    thresholds.SensorName,
    "Alert: Sensor above threshold" AS AlertMessage
FROM input      -- stream input
JOIN thresholds -- reference data input
ON
    input.DeviceId = thresholds.DeviceId
WHERE
    GetRecordPropertyValue(input.SensorReadings, thresholds.SensorName) > thresholds.Value

GetRecordPropertyValue выбирает свойство в SensorReadings, имя которого совпадает с именем свойства, поступающего от эталонных данных. Затем извлекается связанное значение из SensorReadings.

Результат:

DeviceID SensorName AlertMessage
12345 Влажность Оповещение: датчик выше порогового значения

Преобразование полей записей в отдельные события

Чтобы преобразовать поля записей в отдельные события, используйте оператор APPLY вместе с функцией GetRecordProperties.

С исходными примерами данных можно использовать следующий запрос для извлечения свойств в различные события.

SELECT
    event.DeviceID,
    sensorReading.PropertyName,
    sensorReading.PropertyValue
FROM input as event
CROSS APPLY GetRecordProperties(event.SensorReadings) AS sensorReading

Результат:

DeviceID SensorName AlertMessage
12345 Температура 80
12345 Влажность 70
12345 CustomSensor01 5
12345 CustomSensor02 99
12345 SensorMetadata [object Object]

Используя WITH, можно перенаправлять эти события в разные места назначения:

WITH Stage0 AS
(
    SELECT
        event.DeviceID,
        sensorReading.PropertyName,
        sensorReading.PropertyValue
    FROM input as event
    CROSS APPLY GetRecordProperties(event.SensorReadings) AS sensorReading
)

SELECT DeviceID, PropertyValue AS Temperature INTO TemperatureOutput FROM Stage0 WHERE PropertyName = 'Temperature'
SELECT DeviceID, PropertyValue AS Humidity INTO HumidityOutput FROM Stage0 WHERE PropertyName = 'Humidity'

Анализ записи JSON в эталонных данных SQL

При использовании базы данных SQL Azure в качестве эталонных данных в задании может существовать столбец, содержащий данные в формате JSON. Пример показан в следующем примере:

DeviceID Data
12345 {"key": "value1"}
54321 {"key": "value2"}

Можно выполнить синтаксический анализ записи JSON в столбце Данные, написав простую определяемую пользователем функцию JavaScript.

function parseJson(string) {
return JSON.parse(string);
}

Затем вы можете создать шаг в запросе Stream Analytics, как показано здесь, чтобы получить доступ к полям записей JSON.

WITH parseJson as
(
SELECT DeviceID, udf.parseJson(sqlRefInput.Data) as metadata,
FROM sqlRefInput
)

SELECT metadata.key
INTO output
FROM streamInput
JOIN parseJson 
ON streamInput.DeviceID = parseJson.DeviceID

Тип данных "массив"

Тип данных "массив" представляет собой упорядоченную коллекцию значений. Ниже описаны некоторые типичные операции с значениями массива. В этих примерах используются функции GetArrayElement, GetArrayElements, GetArrayLength и оператор APPLY.

Ниже приведен пример события. CustomSensor03 и SensorMetadata принадлежат к типу array:

{
    "DeviceId" : "12345",
    "SensorReadings" :
    {
        "Temperature" : 80,
        "Humidity" : 70,
        "CustomSensor01" : 5,
        "CustomSensor02" : 99,
        "CustomSensor03": [12,-5,0]
     },
    "SensorMetadata":[
        {          
            "smKey":"Manufacturer",
            "smValue":"ABC"                
        },
        {
            "smKey":"Version",
            "smValue":"1.2.45"
        }
    ]
}

Работа с конкретным элементом массива

Выберите элемент массива по указанному индексу (первый элемент массива):

SELECT
    GetArrayElement(SensorReadings.CustomSensor03, 0) AS firstElement
FROM input

Результат:

firstElement
12

Выбор длины массива

SELECT
    GetArrayLength(SensorReadings.CustomSensor03) AS arrayLength
FROM input

Результат:

arrayLength
3

Преобразование элементов массива в отдельные события

Выберите все элементы массива как отдельные события. Оператор APPLY вместе со встроенной функцией GetArrayElements извлекает все элементы массива как отдельные события:

SELECT
    DeviceId,
	CustomSensor03Record.ArrayIndex,
	CustomSensor03Record.ArrayValue
FROM input
CROSS APPLY GetArrayElements(SensorReadings.CustomSensor03) AS CustomSensor03Record

Результат:

DeviceId ArrayIndex ArrayValue
12345 0 12
12345 1 -5
12345 2 0
SELECT   
    i.DeviceId,	
    SensorMetadataRecords.ArrayValue.smKey as smKey,
    SensorMetadataRecords.ArrayValue.smValue as smValue
FROM input i
CROSS APPLY GetArrayElements(SensorMetadata) AS SensorMetadataRecords

Результат:

DeviceId smKey smValue
12345 Производитель ABC
12345 Версия 1.2.45

Если извлеченные поля должны отображаться в столбцах, можно свести набор данных с помощью синтаксиса WITH в дополнение к операции JOIN . Для этого соединения требуется условие границ времени, которое предотвращает дублирование:

WITH DynamicCTE AS (
	SELECT   
		i.DeviceId,
		SensorMetadataRecords.ArrayValue.smKey as smKey,
		SensorMetadataRecords.ArrayValue.smValue as smValue
	FROM input i
	CROSS APPLY GetArrayElements(SensorMetadata) AS SensorMetadataRecords 
)

SELECT
	i.DeviceId,
	i.Location.*,
	V.smValue AS 'smVersion',
	M.smValue AS 'smManufacturer'
FROM input i
LEFT JOIN DynamicCTE V ON V.smKey = 'Version' and V.DeviceId = i.DeviceId AND DATEDIFF(minute,i,V) BETWEEN 0 AND 0 
LEFT JOIN DynamicCTE M ON M.smKey = 'Manufacturer' and M.DeviceId = i.DeviceId AND DATEDIFF(minute,i,M) BETWEEN 0 AND 0

Результат:

DeviceId Шир. Long smVersion smManufacturer
12345 47 122 1.2.45 ABC

Типы данных в Azure Stream Analytics