Анализ данных JSON и AVRO в Azure Stream Analytics
Azure Stream Analytics поддерживает обработку событий в форматах CSV, JSON и Avro. Данные JSON и Avro могут быть структурированными и содержать сложные типы, такие как вложенные объекты (записи) и массивы.
Примечание
Файлы AVRO, созданные функцией записи концентратора событий, используют конкретный формат, который требует функцию пользовательского десериализатора. Дополнительные сведения см. в разделе Чтение входных данных в любом формате с помощью пользовательских десериализаторов .NET.
Тип данных "запись"
Тип данных "запись" используется для представления массивов JSON и Avro, когда соответствующие форматы используются во входных потоках данных. Эти примеры демонстрируют пример датчика, который считывает входные события в формате JSON. Ниже приведен пример одного события:
{
"DeviceId" : "12345",
"Location" :
{
"Lat": 47,
"Long": 122
},
"SensorReadings" :
{
"Temperature" : 80,
"Humidity" : 70,
"CustomSensor01" : 5,
"CustomSensor02" : 99,
"SensorMetadata" :
{
"Manufacturer":"ABC",
"Version":"1.2.45"
}
}
}
Доступ к вложенным полям в известной схеме
Используйте точечную нотацию (.) для простого доступа к вложенным полям непосредственно из запроса. Например, этот запрос выбирает координаты широты и долготы в свойстве Location из предыдущего фрагмента данных JSON. Точечную нотацию можно использовать для навигации на нескольких уровнях, как показано ниже.
SELECT
DeviceID,
Location.Lat,
Location.Long,
SensorReadings.Temperature,
SensorReadings.SensorMetadata.Version
FROM input
Результат:
DeviceID | Lat | Long | температура; | Версия |
---|---|---|---|---|
12345 | 47 | 122 | 80 | 1.2.45 |
Выбор всех свойств
Все свойства вложенной записи можно выбрать с помощью подстановочного знака "*". Рассмотрим следующий пример:
SELECT
DeviceID,
Location.*
FROM input
Результат:
DeviceID | Lat | Long |
---|---|---|
12345 | 47 | 122 |
Доступ к вложенным полям, если имя свойства является переменной
Используйте функцию GetRecordPropertyValue, если имя свойства является переменной. Это позволяет создавать динамические запросы без заданных имен свойств.
Например, представьте пример потока данных, который нужно соединить с эталонными данными, содержащими пороговые значения для каждого датчика устройства. Ниже приведен фрагмент кода с такими эталонными данными.
{
"DeviceId" : "12345",
"SensorName" : "Temperature",
"Value" : 85
},
{
"DeviceId" : "12345",
"SensorName" : "Humidity",
"Value" : 65
}
Цель этого примера — присоединить наш пример набора данных из верхней части статьи к этим эталонным данным и вывести одно событие для каждой меры датчика выше порогового значения. Это означает, что с помощью объединения одно событие может создать несколько выходных событий, если несколько датчиков превысили пороги. Чтобы получить аналогичные результаты без объединения, см. раздел ниже.
SELECT
input.DeviceID,
thresholds.SensorName,
"Alert: Sensor above threshold" AS AlertMessage
FROM input -- stream input
JOIN thresholds -- reference data input
ON
input.DeviceId = thresholds.DeviceId
WHERE
GetRecordPropertyValue(input.SensorReadings, thresholds.SensorName) > thresholds.Value
GetRecordPropertyValue выбирает свойство в SensorReadings, имя которого совпадает с именем свойства, поступающего от эталонных данных. Затем извлекается связанное значение из SensorReadings.
Результат:
DeviceID | SensorName | AlertMessage |
---|---|---|
12345 | влажность. | Предупреждение. Датчик, превышающий пороговое значение |
Преобразование полей записей в отдельные события
Чтобы преобразовать поля записей в отдельные события, используйте оператор APPLY вместе с функцией GetRecordProperties.
С исходными примерами данных можно использовать следующий запрос для извлечения свойств в различные события.
SELECT
event.DeviceID,
sensorReading.PropertyName,
sensorReading.PropertyValue
FROM input as event
CROSS APPLY GetRecordProperties(event.SensorReadings) AS sensorReading
Результат:
DeviceID | SensorName | AlertMessage |
---|---|---|
12345 | температура; | 80 |
12345 | влажность. | 70 |
12345 | CustomSensor01 | 5 |
12345 | CustomSensor02 | 99 |
12345 | SensorMetadata | [object Object] |
Используя WITH, можно перенаправлять эти события в разные места назначения:
WITH Stage0 AS
(
SELECT
event.DeviceID,
sensorReading.PropertyName,
sensorReading.PropertyValue
FROM input as event
CROSS APPLY GetRecordProperties(event.SensorReadings) AS sensorReading
)
SELECT DeviceID, PropertyValue AS Temperature INTO TemperatureOutput FROM Stage0 WHERE PropertyName = 'Temperature'
SELECT DeviceID, PropertyValue AS Humidity INTO HumidityOutput FROM Stage0 WHERE PropertyName = 'Humidity'
Анализ записи JSON в эталонных данных SQL
При использовании базы данных SQL Azure в качестве эталонных данных в задании может существовать столбец, содержащий данные в формате JSON. Ниже приведен пример такого файла.
DeviceID | Данные |
---|---|
12345 | {"key": "value1"} |
54321 | {"key": "value2"} |
Можно выполнить синтаксический анализ записи JSON в столбце Данные, написав простую определяемую пользователем функцию JavaScript.
function parseJson(string) {
return JSON.parse(string);
}
Затем можно создать шаг в запросе Stream Analytics, как показано ниже, чтобы получить доступ к полям записей JSON.
WITH parseJson as
(
SELECT DeviceID, udf.parseJson(sqlRefInput.Data) as metadata,
FROM sqlRefInput
)
SELECT metadata.key
INTO output
FROM streamInput
JOIN parseJson
ON streamInput.DeviceID = parseJson.DeviceID
Тип данных "массив"
Тип данных "массив" представляет собой упорядоченную коллекцию значений. Ниже приведены типичные операции со значениями массивов. В этих примерах используются функции GetArrayElement, GetArrayElements, GetArrayLength и оператор APPLY.
Ниже приведен пример события.
CustomSensor03
и SensorMetadata
принадлежат к типу array:
{
"DeviceId" : "12345",
"SensorReadings" :
{
"Temperature" : 80,
"Humidity" : 70,
"CustomSensor01" : 5,
"CustomSensor02" : 99,
"CustomSensor03": [12,-5,0]
},
"SensorMetadata":[
{
"smKey":"Manufacturer",
"smValue":"ABC"
},
{
"smKey":"Version",
"smValue":"1.2.45"
}
]
}
Работа с конкретным элементом массива
Выберите элемент массива по указанному индексу (первый элемент массива):
SELECT
GetArrayElement(SensorReadings.CustomSensor03, 0) AS firstElement
FROM input
Результат:
firstElement |
---|
12 |
Выбор длины массива
SELECT
GetArrayLength(SensorReadings.CustomSensor03) AS arrayLength
FROM input
Результат:
arrayLength |
---|
3 |
Преобразование элементов массива в отдельные события
Выберите все элементы массива как отдельные события. Оператор APPLY вместе со встроенной функцией GetArrayElements извлекает все элементы массива как отдельные события:
SELECT
DeviceId,
CustomSensor03Record.ArrayIndex,
CustomSensor03Record.ArrayValue
FROM input
CROSS APPLY GetArrayElements(SensorReadings.CustomSensor03) AS CustomSensor03Record
Результат:
deviceId | ArrayIndex | ArrayValue |
---|---|---|
12345 | 0 | 12 |
12345 | 1 | -5 |
12345 | 2 | 0 |
SELECT
i.DeviceId,
SensorMetadataRecords.ArrayValue.smKey as smKey,
SensorMetadataRecords.ArrayValue.smValue as smValue
FROM input i
CROSS APPLY GetArrayElements(SensorMetadata) AS SensorMetadataRecords
Результат:
deviceId | smKey | smValue |
---|---|---|
12345 | Производитель | ABC |
12345 | Версия | 1.2.45 |
Если извлеченные поля должны появиться в столбцах, можно выполнить сведение набора данных с помощью синтаксиса WITH в дополнение к операции JOIN. Для этого соединения требуется условие границы времени , которое предотвращает дублирование:
WITH DynamicCTE AS (
SELECT
i.DeviceId,
SensorMetadataRecords.ArrayValue.smKey as smKey,
SensorMetadataRecords.ArrayValue.smValue as smValue
FROM input i
CROSS APPLY GetArrayElements(SensorMetadata) AS SensorMetadataRecords
)
SELECT
i.DeviceId,
i.Location.*,
V.smValue AS 'smVersion',
M.smValue AS 'smManufacturer'
FROM input i
LEFT JOIN DynamicCTE V ON V.smKey = 'Version' and V.DeviceId = i.DeviceId AND DATEDIFF(minute,i,V) BETWEEN 0 AND 0
LEFT JOIN DynamicCTE M ON M.smKey = 'Manufacturer' and M.DeviceId = i.DeviceId AND DATEDIFF(minute,i,M) BETWEEN 0 AND 0
Результат:
deviceId | Lat | Long | smVersion | smManufacturer |
---|---|---|---|---|
12345 | 47 | 122 | 1.2.45 | ABC |