Analizowanie danych JSON i Avro w usłudze Azure Stream Analytics
Usługa Azure Stream Analytics obsługuje przetwarzanie zdarzeń w formatach danych CSV, JSON i Avro. Zarówno dane JSON, jak i Avro mogą być ustrukturyzowane i zawierają niektóre złożone typy, takie jak zagnieżdżone obiekty (rekordy) i tablice.
Uwaga
Pliki AVRO utworzone przez funkcję przechwytywania usługi Event Hubs używają określonego formatu, który wymaga użycia niestandardowej funkcji deserializacji . Aby uzyskać więcej informacji, zobacz Odczyt danych wejściowych w dowolnym formacie przy użyciu niestandardowych deserializacji platformy .NET.
Rejestrowanie typów danych
Typy danych rekordów służą do reprezentowania tablic JSON i Avro, gdy odpowiednie formaty są używane w strumieniach danych wejściowych. W tych przykładach pokazano przykładowy czujnik, który odczytuje zdarzenia wejściowe w formacie JSON. Oto przykład pojedynczego zdarzenia:
{
"DeviceId" : "12345",
"Location" :
{
"Lat": 47,
"Long": 122
},
"SensorReadings" :
{
"Temperature" : 80,
"Humidity" : 70,
"CustomSensor01" : 5,
"CustomSensor02" : 99,
"SensorMetadata" :
{
"Manufacturer":"ABC",
"Version":"1.2.45"
}
}
}
Uzyskiwanie dostępu do pól zagnieżdżonych w znanym schemacie
Użyj notacji kropkowej (.), aby łatwo uzyskać dostęp do zagnieżdżonych pól bezpośrednio z zapytania. Na przykład to zapytanie wybiera współrzędne szerokości i długości geograficznej we właściwości Location w poprzednich danych JSON. Notacja kropkowa może służyć do nawigowania po wielu poziomach, jak pokazano w poniższym fragmencie kodu:
SELECT
DeviceID,
Location.Lat,
Location.Long,
SensorReadings.Temperature,
SensorReadings.SensorMetadata.Version
FROM input
Wynik to:
Identyfikator urządzenia | Lat | Długi | Temperatura | Wersja |
---|---|---|---|---|
12345 | 47 | 122 | 80 | 1.2.45 |
Wybierz wszystkie właściwości
Możesz wybrać wszystkie właściwości zagnieżdżonego rekordu przy użyciu symbolu wieloznakowego "*". Rozważmy następujący przykład:
SELECT
DeviceID,
Location.*
FROM input
Wynik to:
Identyfikator urządzenia | Lat | Długi |
---|---|---|
12345 | 47 | 122 |
Uzyskiwanie dostępu do pól zagnieżdżonych, gdy nazwa właściwości jest zmienną
Użyj funkcji GetRecordPropertyValue, jeśli nazwa właściwości jest zmienną. Umożliwia tworzenie zapytań dynamicznych bez kodowania nazw właściwości.
Załóżmy na przykład, że przykładowy strumień danych musi być połączony z danymi referencyjnymi zawierającymi progi dla każdego czujnika urządzenia. Fragment takich danych referencyjnych jest wyświetlany w poniższym fragmencie kodu.
{
"DeviceId" : "12345",
"SensorName" : "Temperature",
"Value" : 85
},
{
"DeviceId" : "12345",
"SensorName" : "Humidity",
"Value" : 65
}
Celem jest dołączenie naszego przykładowego zestawu danych z góry artykułu do tych danych referencyjnych i wyprowadzenie jednego zdarzenia dla każdej miary czujnika powyżej progu. Oznacza to, że nasze pojedyncze zdarzenie powyżej może wygenerować wiele zdarzeń wyjściowych, jeśli wiele czujników przekracza odpowiednie progi, dzięki sprzężeniu. Aby uzyskać podobne wyniki bez sprzężenia, zobacz następujący przykład:
SELECT
input.DeviceID,
thresholds.SensorName,
"Alert: Sensor above threshold" AS AlertMessage
FROM input -- stream input
JOIN thresholds -- reference data input
ON
input.DeviceId = thresholds.DeviceId
WHERE
GetRecordPropertyValue(input.SensorReadings, thresholds.SensorName) > thresholds.Value
Właściwość GetRecordPropertyValue wybiera właściwość SensorReadings, która jest zgodna z nazwą właściwości pochodzącej z danych referencyjnych. Następnie zostanie wyodrębniona skojarzona wartość z funkcji SensorReadings .
Wynik to:
Identyfikator urządzenia | SensorName | AlertMessage |
---|---|---|
12345 | Wilgotność | Alert: Czujnik powyżej progu |
Konwertowanie pól rekordów na oddzielne zdarzenia
Aby przekonwertować pola rekordów na oddzielne zdarzenia, użyj operatora APPLY razem z funkcją GetRecordProperties .
W przypadku oryginalnych przykładowych danych następujące zapytanie może służyć do wyodrębniania właściwości do różnych zdarzeń.
SELECT
event.DeviceID,
sensorReading.PropertyName,
sensorReading.PropertyValue
FROM input as event
CROSS APPLY GetRecordProperties(event.SensorReadings) AS sensorReading
Wynik to:
Identyfikator urządzenia | SensorName | AlertMessage |
---|---|---|
12345 | Temperatura | 80 |
12345 | Wilgotność | 70 |
12345 | CustomSensor01 | 5 |
12345 | CustomSensor02 | 99 |
12345 | SensorMetadata | [object Object] |
Za pomocą funkcji WITH możliwe jest kierowanie tych zdarzeń do różnych miejsc docelowych:
WITH Stage0 AS
(
SELECT
event.DeviceID,
sensorReading.PropertyName,
sensorReading.PropertyValue
FROM input as event
CROSS APPLY GetRecordProperties(event.SensorReadings) AS sensorReading
)
SELECT DeviceID, PropertyValue AS Temperature INTO TemperatureOutput FROM Stage0 WHERE PropertyName = 'Temperature'
SELECT DeviceID, PropertyValue AS Humidity INTO HumidityOutput FROM Stage0 WHERE PropertyName = 'Humidity'
Analizowanie rekordu JSON w danych referencyjnych SQL
W przypadku korzystania z usługi Azure SQL Database jako danych referencyjnych w zadaniu można mieć kolumnę zawierającą dane w formacie JSON. Przykład pokazano w poniższym przykładzie:
Identyfikator urządzenia | Data |
---|---|
12345 | {"key": "value1"} |
54321 | {"key": "value2"} |
Rekord JSON można przeanalizować w kolumnie Dane , pisząc prostą funkcję zdefiniowaną przez użytkownika w języku JavaScript.
function parseJson(string) {
return JSON.parse(string);
}
Następnie możesz utworzyć krok w zapytaniu usługi Stream Analytics, jak pokazano tutaj, aby uzyskać dostęp do pól rekordów JSON.
WITH parseJson as
(
SELECT DeviceID, udf.parseJson(sqlRefInput.Data) as metadata,
FROM sqlRefInput
)
SELECT metadata.key
INTO output
FROM streamInput
JOIN parseJson
ON streamInput.DeviceID = parseJson.DeviceID
Typy danych tablicy
Typy danych tablicy to uporządkowana kolekcja wartości. Poniżej opisano niektóre typowe operacje dotyczące wartości tablicy. W tych przykładach użyto funkcji GetArrayElement, GetArrayElements, GetArrayLength i operatora APPLY .
Oto przykład zdarzenia. Oba CustomSensor03
typy i SensorMetadata
są tablicą typów:
{
"DeviceId" : "12345",
"SensorReadings" :
{
"Temperature" : 80,
"Humidity" : 70,
"CustomSensor01" : 5,
"CustomSensor02" : 99,
"CustomSensor03": [12,-5,0]
},
"SensorMetadata":[
{
"smKey":"Manufacturer",
"smValue":"ABC"
},
{
"smKey":"Version",
"smValue":"1.2.45"
}
]
}
Praca z określonym elementem tablicy
Wybierz element tablicy w określonym indeksie (wybierając pierwszy element tablicy):
SELECT
GetArrayElement(SensorReadings.CustomSensor03, 0) AS firstElement
FROM input
Wynik to:
firstElement |
---|
12 |
Wybieranie długości tablicy
SELECT
GetArrayLength(SensorReadings.CustomSensor03) AS arrayLength
FROM input
Wynik to:
arrayLength |
---|
3 |
Konwertowanie elementów tablicy na oddzielne zdarzenia
Wybierz wszystkie elementy tablicy jako pojedyncze zdarzenia. Operator APPLY wraz z wbudowaną funkcją GetArrayElements wyodrębnia wszystkie elementy tablicy jako pojedyncze zdarzenia:
SELECT
DeviceId,
CustomSensor03Record.ArrayIndex,
CustomSensor03Record.ArrayValue
FROM input
CROSS APPLY GetArrayElements(SensorReadings.CustomSensor03) AS CustomSensor03Record
Wynik to:
DeviceId | ArrayIndex | ArrayValue |
---|---|---|
12345 | 0 | 12 |
12345 | 1 | -5 |
12345 | 2 | 0 |
SELECT
i.DeviceId,
SensorMetadataRecords.ArrayValue.smKey as smKey,
SensorMetadataRecords.ArrayValue.smValue as smValue
FROM input i
CROSS APPLY GetArrayElements(SensorMetadata) AS SensorMetadataRecords
Wynik to:
DeviceId | smKey | smValue |
---|---|---|
12345 | Producent | ABC |
12345 | Wersja | 1.2.45 |
Jeśli wyodrębnione pola muszą być wyświetlane w kolumnach, można przestawić zestaw danych przy użyciu składni WITH oprócz operacji JOIN . To sprzężenia wymaga warunku granicy czasu, który uniemożliwia duplikowanie:
WITH DynamicCTE AS (
SELECT
i.DeviceId,
SensorMetadataRecords.ArrayValue.smKey as smKey,
SensorMetadataRecords.ArrayValue.smValue as smValue
FROM input i
CROSS APPLY GetArrayElements(SensorMetadata) AS SensorMetadataRecords
)
SELECT
i.DeviceId,
i.Location.*,
V.smValue AS 'smVersion',
M.smValue AS 'smManufacturer'
FROM input i
LEFT JOIN DynamicCTE V ON V.smKey = 'Version' and V.DeviceId = i.DeviceId AND DATEDIFF(minute,i,V) BETWEEN 0 AND 0
LEFT JOIN DynamicCTE M ON M.smKey = 'Manufacturer' and M.DeviceId = i.DeviceId AND DATEDIFF(minute,i,M) BETWEEN 0 AND 0
Wynik to:
DeviceId | Lat | Długi | smVersion | smManufacturer |
---|---|---|---|---|
12345 | 47 | 122 | 1.2.45 | ABC |