Анализ данных JSON и AVRO в Azure Stream Analytics
Служба Azure Stream Analytics поддерживает обработку событий в форматах данных CSV, JSON и Avro. Данные JSON и Avro могут быть структурированными и содержать сложные типы, такие как вложенные объекты (записи) и массивы.
Примечание.
Файлы AVRO, созданные в Центрах событий, используют определенный формат, который требует использования пользовательской функции десериализатора . Дополнительные сведения см. в разделе Чтение входных данных в любом формате с помощью пользовательских десериализаторов .NET.
Тип данных "запись"
Тип данных "запись" используется для представления массивов JSON и Avro, когда соответствующие форматы используются во входных потоках данных. Эти примеры демонстрируют пример датчика, который считывает входные события в формате JSON. Ниже приведен пример одного события:
{
"DeviceId" : "12345",
"Location" :
{
"Lat": 47,
"Long": 122
},
"SensorReadings" :
{
"Temperature" : 80,
"Humidity" : 70,
"CustomSensor01" : 5,
"CustomSensor02" : 99,
"SensorMetadata" :
{
"Manufacturer":"ABC",
"Version":"1.2.45"
}
}
}
Доступ к вложенным полям в известной схеме
Используйте точечную нотацию (.) для простого доступа к вложенным полям непосредственно из запроса. Например, этот запрос выбирает координаты широты и долготы в свойстве Location из предыдущего фрагмента данных JSON. Нотация точек может использоваться для навигации по нескольким уровням, как показано в следующем фрагменте кода:
SELECT
DeviceID,
Location.Lat,
Location.Long,
SensorReadings.Temperature,
SensorReadings.SensorMetadata.Version
FROM input
Результат:
DeviceID | Шир. | Long | Температура | Версия |
---|---|---|---|---|
12345 | 47 | 122 | 80 | 1.2.45 |
Выбор всех свойств
Все свойства вложенной записи можно выбрать с помощью подстановочного знака "*". Рассмотрим следующий пример:
SELECT
DeviceID,
Location.*
FROM input
Результат:
DeviceID | Шир. | Long |
---|---|---|
12345 | 47 | 122 |
Доступ к вложенным полям, если имя свойства является переменной
Используйте функцию GetRecordPropertyValue, если имя свойства является переменной. Он позволяет создавать динамические запросы без жесткого обозначения имен свойств.
Например, представьте пример потока данных, который нужно соединить с эталонными данными, содержащими пороговые значения для каждого датчика устройства. Фрагмент таких ссылочных данных показан в следующем фрагменте кода.
{
"DeviceId" : "12345",
"SensorName" : "Temperature",
"Value" : 85
},
{
"DeviceId" : "12345",
"SensorName" : "Humidity",
"Value" : 65
}
Цель состоит в том, чтобы присоединить наш пример набора данных из верхней части статьи к этим эталонным данным и вывести одно событие для каждой меры датчика выше порогового значения. Это означает, что с помощью объединения одно событие может создать несколько выходных событий, если несколько датчиков превысили пороги. Чтобы добиться аналогичных результатов без соединения, см. следующий пример:
SELECT
input.DeviceID,
thresholds.SensorName,
"Alert: Sensor above threshold" AS AlertMessage
FROM input -- stream input
JOIN thresholds -- reference data input
ON
input.DeviceId = thresholds.DeviceId
WHERE
GetRecordPropertyValue(input.SensorReadings, thresholds.SensorName) > thresholds.Value
GetRecordPropertyValue выбирает свойство в SensorReadings, имя которого совпадает с именем свойства, поступающего от эталонных данных. Затем извлекается связанное значение из SensorReadings.
Результат:
DeviceID | SensorName | AlertMessage |
---|---|---|
12345 | Влажность | Оповещение: датчик выше порогового значения |
Преобразование полей записей в отдельные события
Чтобы преобразовать поля записей в отдельные события, используйте оператор APPLY вместе с функцией GetRecordProperties.
С исходными примерами данных можно использовать следующий запрос для извлечения свойств в различные события.
SELECT
event.DeviceID,
sensorReading.PropertyName,
sensorReading.PropertyValue
FROM input as event
CROSS APPLY GetRecordProperties(event.SensorReadings) AS sensorReading
Результат:
DeviceID | SensorName | AlertMessage |
---|---|---|
12345 | Температура | 80 |
12345 | Влажность | 70 |
12345 | CustomSensor01 | 5 |
12345 | CustomSensor02 | 99 |
12345 | SensorMetadata | [object Object] |
Используя WITH, можно перенаправлять эти события в разные места назначения:
WITH Stage0 AS
(
SELECT
event.DeviceID,
sensorReading.PropertyName,
sensorReading.PropertyValue
FROM input as event
CROSS APPLY GetRecordProperties(event.SensorReadings) AS sensorReading
)
SELECT DeviceID, PropertyValue AS Temperature INTO TemperatureOutput FROM Stage0 WHERE PropertyName = 'Temperature'
SELECT DeviceID, PropertyValue AS Humidity INTO HumidityOutput FROM Stage0 WHERE PropertyName = 'Humidity'
Анализ записи JSON в эталонных данных SQL
При использовании базы данных SQL Azure в качестве эталонных данных в задании может существовать столбец, содержащий данные в формате JSON. Пример показан в следующем примере:
DeviceID | Data |
---|---|
12345 | {"key": "value1"} |
54321 | {"key": "value2"} |
Можно выполнить синтаксический анализ записи JSON в столбце Данные, написав простую определяемую пользователем функцию JavaScript.
function parseJson(string) {
return JSON.parse(string);
}
Затем вы можете создать шаг в запросе Stream Analytics, как показано здесь, чтобы получить доступ к полям записей JSON.
WITH parseJson as
(
SELECT DeviceID, udf.parseJson(sqlRefInput.Data) as metadata,
FROM sqlRefInput
)
SELECT metadata.key
INTO output
FROM streamInput
JOIN parseJson
ON streamInput.DeviceID = parseJson.DeviceID
Тип данных "массив"
Тип данных "массив" представляет собой упорядоченную коллекцию значений. Ниже описаны некоторые типичные операции с значениями массива. В этих примерах используются функции GetArrayElement, GetArrayElements, GetArrayLength и оператор APPLY.
Ниже приведен пример события.
CustomSensor03
и SensorMetadata
принадлежат к типу array:
{
"DeviceId" : "12345",
"SensorReadings" :
{
"Temperature" : 80,
"Humidity" : 70,
"CustomSensor01" : 5,
"CustomSensor02" : 99,
"CustomSensor03": [12,-5,0]
},
"SensorMetadata":[
{
"smKey":"Manufacturer",
"smValue":"ABC"
},
{
"smKey":"Version",
"smValue":"1.2.45"
}
]
}
Работа с конкретным элементом массива
Выберите элемент массива по указанному индексу (первый элемент массива):
SELECT
GetArrayElement(SensorReadings.CustomSensor03, 0) AS firstElement
FROM input
Результат:
firstElement |
---|
12 |
Выбор длины массива
SELECT
GetArrayLength(SensorReadings.CustomSensor03) AS arrayLength
FROM input
Результат:
arrayLength |
---|
3 |
Преобразование элементов массива в отдельные события
Выберите все элементы массива как отдельные события. Оператор APPLY вместе со встроенной функцией GetArrayElements извлекает все элементы массива как отдельные события:
SELECT
DeviceId,
CustomSensor03Record.ArrayIndex,
CustomSensor03Record.ArrayValue
FROM input
CROSS APPLY GetArrayElements(SensorReadings.CustomSensor03) AS CustomSensor03Record
Результат:
DeviceId | ArrayIndex | ArrayValue |
---|---|---|
12345 | 0 | 12 |
12345 | 1 | -5 |
12345 | 2 | 0 |
SELECT
i.DeviceId,
SensorMetadataRecords.ArrayValue.smKey as smKey,
SensorMetadataRecords.ArrayValue.smValue as smValue
FROM input i
CROSS APPLY GetArrayElements(SensorMetadata) AS SensorMetadataRecords
Результат:
DeviceId | smKey | smValue |
---|---|---|
12345 | Производитель | ABC |
12345 | Версия | 1.2.45 |
Если извлеченные поля должны отображаться в столбцах, можно свести набор данных с помощью синтаксиса WITH в дополнение к операции JOIN . Для этого соединения требуется условие границ времени, которое предотвращает дублирование:
WITH DynamicCTE AS (
SELECT
i.DeviceId,
SensorMetadataRecords.ArrayValue.smKey as smKey,
SensorMetadataRecords.ArrayValue.smValue as smValue
FROM input i
CROSS APPLY GetArrayElements(SensorMetadata) AS SensorMetadataRecords
)
SELECT
i.DeviceId,
i.Location.*,
V.smValue AS 'smVersion',
M.smValue AS 'smManufacturer'
FROM input i
LEFT JOIN DynamicCTE V ON V.smKey = 'Version' and V.DeviceId = i.DeviceId AND DATEDIFF(minute,i,V) BETWEEN 0 AND 0
LEFT JOIN DynamicCTE M ON M.smKey = 'Manufacturer' and M.DeviceId = i.DeviceId AND DATEDIFF(minute,i,M) BETWEEN 0 AND 0
Результат:
DeviceId | Шир. | Long | smVersion | smManufacturer |
---|---|---|---|---|
12345 | 47 | 122 | 1.2.45 | ABC |