Analizowanie danych JSON i Avro w usłudze Azure Stream Analytics
Usługa Azure Stream Analytics obsługuje przetwarzanie zdarzeń w formatach danych CSV, JSON i Avro. Zarówno dane JSON, jak i Avro mogą być ustrukturyzowane i zawierają niektóre złożone typy, takie jak zagnieżdżone obiekty (rekordy) i tablice.
Uwaga
Pliki AVRO utworzone przez funkcję przechwytywania centrum zdarzeń używają określonego formatu, który wymaga użycia niestandardowej funkcji deserializatora . Aby uzyskać więcej informacji, zobacz Odczyt danych wejściowych w dowolnym formacie przy użyciu niestandardowych deserializacji platformy .NET.
Rejestrowanie typów danych
Typy danych rekordów są używane do reprezentowania tablic JSON i Avro, gdy odpowiednie formaty są używane w strumieniach danych wejściowych. W tych przykładach pokazano przykładowy czujnik, który odczytuje zdarzenia wejściowe w formacie JSON. Oto przykład pojedynczego zdarzenia:
{
"DeviceId" : "12345",
"Location" :
{
"Lat": 47,
"Long": 122
},
"SensorReadings" :
{
"Temperature" : 80,
"Humidity" : 70,
"CustomSensor01" : 5,
"CustomSensor02" : 99,
"SensorMetadata" :
{
"Manufacturer":"ABC",
"Version":"1.2.45"
}
}
}
Uzyskiwanie dostępu do zagnieżdżonych pól w znanym schemacie
Użyj notacji kropkowej (.), aby łatwo uzyskać dostęp do zagnieżdżonych pól bezpośrednio z zapytania. Na przykład to zapytanie wybiera współrzędne szerokości i długości geograficznej w ramach właściwości Location w poprzednich danych JSON. Notacja kropkowa może służyć do nawigowania po wielu poziomach, jak pokazano poniżej.
SELECT
DeviceID,
Location.Lat,
Location.Long,
SensorReadings.Temperature,
SensorReadings.SensorMetadata.Version
FROM input
Wynik to:
Deviceid | Lat | Długi | Temperatura | Wersja |
---|---|---|---|---|
12345 | 47 | 122 | 80 | 1.2.45 |
Wybierz wszystkie właściwości
Możesz wybrać wszystkie właściwości zagnieżdżonego rekordu przy użyciu symbolu wieloznacznych "*". Rozpatrzmy następujący przykład:
SELECT
DeviceID,
Location.*
FROM input
Wynik to:
Deviceid | Lat | Długi |
---|---|---|
12345 | 47 | 122 |
Uzyskiwanie dostępu do zagnieżdżonych pól, gdy nazwa właściwości jest zmienną
Użyj funkcji GetRecordPropertyValue , jeśli nazwa właściwości jest zmienną. Umożliwia to tworzenie zapytań dynamicznych bez twardych nazw właściwości.
Załóżmy na przykład, że przykładowy strumień danych musi być połączony z danymi referencyjnymi zawierającymi progi dla każdego czujnika urządzenia. Poniżej przedstawiono fragment kodu z takimi danymi referencyjnymi.
{
"DeviceId" : "12345",
"SensorName" : "Temperature",
"Value" : 85
},
{
"DeviceId" : "12345",
"SensorName" : "Humidity",
"Value" : 65
}
Celem jest dołączenie naszego przykładowego zestawu danych z góry artykułu do tych danych referencyjnych i wyprowadzenie jednego zdarzenia dla każdej miary czujnika powyżej progu. Oznacza to, że nasze pojedyncze zdarzenie powyżej może wygenerować wiele zdarzeń wyjściowych, jeśli wiele czujników przekracza odpowiednie progi, dzięki sprzężeniu. Aby uzyskać podobne wyniki bez sprzężenia, zobacz sekcję poniżej.
SELECT
input.DeviceID,
thresholds.SensorName,
"Alert: Sensor above threshold" AS AlertMessage
FROM input -- stream input
JOIN thresholds -- reference data input
ON
input.DeviceId = thresholds.DeviceId
WHERE
GetRecordPropertyValue(input.SensorReadings, thresholds.SensorName) > thresholds.Value
Polecenie GetRecordPropertyValue wybiera właściwość SensorReadings, która odpowiada nazwie właściwości pochodzącej z danych referencyjnych. Następnie wyodrębniona jest skojarzona wartość z sensorReadings .
Wynik to:
Deviceid | SensorName | AlertMessage |
---|---|---|
12345 | Wilgotność | Alert: Czujnik powyżej progu |
Konwertowanie pól rekordów na oddzielne zdarzenia
Aby przekonwertować pola rekordów na oddzielne zdarzenia, użyj operatora APPLY razem z funkcją GetRecordProperties .
W przypadku oryginalnych przykładowych danych poniższe zapytanie może służyć do wyodrębniania właściwości do różnych zdarzeń.
SELECT
event.DeviceID,
sensorReading.PropertyName,
sensorReading.PropertyValue
FROM input as event
CROSS APPLY GetRecordProperties(event.SensorReadings) AS sensorReading
Wynik to:
Deviceid | SensorName | AlertMessage |
---|---|---|
12345 | Temperatura | 80 |
12345 | Wilgotność | 70 |
12345 | CustomSensor01 | 5 |
12345 | CustomSensor02 | 99 |
12345 | SensorMetadata | [object Object] |
Za pomocą funkcji WITH możliwe jest kierowanie tych zdarzeń do różnych miejsc docelowych:
WITH Stage0 AS
(
SELECT
event.DeviceID,
sensorReading.PropertyName,
sensorReading.PropertyValue
FROM input as event
CROSS APPLY GetRecordProperties(event.SensorReadings) AS sensorReading
)
SELECT DeviceID, PropertyValue AS Temperature INTO TemperatureOutput FROM Stage0 WHERE PropertyName = 'Temperature'
SELECT DeviceID, PropertyValue AS Humidity INTO HumidityOutput FROM Stage0 WHERE PropertyName = 'Humidity'
Analizowanie rekordu JSON w danych referencyjnych SQL
W przypadku używania Azure SQL Database jako danych referencyjnych w zadaniu można mieć kolumnę zawierającą dane w formacie JSON. Przykład przedstawiono poniżej.
Deviceid | Dane |
---|---|
12345 | {"key": "value1"} |
54321 | {"key": "value2"} |
Rekord JSON można przeanalizować w kolumnie Dane , pisząc prostą funkcję zdefiniowaną przez użytkownika w języku JavaScript.
function parseJson(string) {
return JSON.parse(string);
}
Następnie możesz utworzyć krok w zapytaniu usługi Stream Analytics, jak pokazano poniżej, aby uzyskać dostęp do pól rekordów JSON.
WITH parseJson as
(
SELECT DeviceID, udf.parseJson(sqlRefInput.Data) as metadata,
FROM sqlRefInput
)
SELECT metadata.key
INTO output
FROM streamInput
JOIN parseJson
ON streamInput.DeviceID = parseJson.DeviceID
Typy danych tablicy
Typy danych tablicy to uporządkowana kolekcja wartości. Poniżej przedstawiono niektóre typowe operacje dotyczące wartości tablicy. W tych przykładach użyto funkcji GetArrayElement, GetArrayElements, GetArrayLength i operatora APPLY.
Oto przykład zdarzenia. Zarówno, jak CustomSensor03
i SensorMetadata
są typu tablica:
{
"DeviceId" : "12345",
"SensorReadings" :
{
"Temperature" : 80,
"Humidity" : 70,
"CustomSensor01" : 5,
"CustomSensor02" : 99,
"CustomSensor03": [12,-5,0]
},
"SensorMetadata":[
{
"smKey":"Manufacturer",
"smValue":"ABC"
},
{
"smKey":"Version",
"smValue":"1.2.45"
}
]
}
Praca z określonym elementem tablicy
Wybierz element tablicy w określonym indeksie (wybierając pierwszy element tablicy):
SELECT
GetArrayElement(SensorReadings.CustomSensor03, 0) AS firstElement
FROM input
Wynik to:
firstElement |
---|
12 |
Wybieranie długości tablicy
SELECT
GetArrayLength(SensorReadings.CustomSensor03) AS arrayLength
FROM input
Wynik to:
arrayLength |
---|
3 |
Konwertowanie elementów tablicy na oddzielne zdarzenia
Wybierz wszystkie elementy tablicy jako pojedyncze zdarzenia. Operator APPLY wraz z wbudowaną funkcją GetArrayElements wyodrębnia wszystkie elementy tablicy jako pojedyncze zdarzenia:
SELECT
DeviceId,
CustomSensor03Record.ArrayIndex,
CustomSensor03Record.ArrayValue
FROM input
CROSS APPLY GetArrayElements(SensorReadings.CustomSensor03) AS CustomSensor03Record
Wynik to:
DeviceId | Arrayindex | Arrayvalue |
---|---|---|
12345 | 0 | 12 |
12345 | 1 | -5 |
12345 | 2 | 0 |
SELECT
i.DeviceId,
SensorMetadataRecords.ArrayValue.smKey as smKey,
SensorMetadataRecords.ArrayValue.smValue as smValue
FROM input i
CROSS APPLY GetArrayElements(SensorMetadata) AS SensorMetadataRecords
Wynik to:
DeviceId | smKey | smValue |
---|---|---|
12345 | Producent | ABC |
12345 | Wersja | 1.2.45 |
Jeśli wyodrębnione pola muszą być wyświetlane w kolumnach, można przestawić zestaw danych przy użyciu składni WITH oprócz operacji JOIN . To sprzężenia wymaga warunku granicy czasu , który uniemożliwia duplikowanie:
WITH DynamicCTE AS (
SELECT
i.DeviceId,
SensorMetadataRecords.ArrayValue.smKey as smKey,
SensorMetadataRecords.ArrayValue.smValue as smValue
FROM input i
CROSS APPLY GetArrayElements(SensorMetadata) AS SensorMetadataRecords
)
SELECT
i.DeviceId,
i.Location.*,
V.smValue AS 'smVersion',
M.smValue AS 'smManufacturer'
FROM input i
LEFT JOIN DynamicCTE V ON V.smKey = 'Version' and V.DeviceId = i.DeviceId AND DATEDIFF(minute,i,V) BETWEEN 0 AND 0
LEFT JOIN DynamicCTE M ON M.smKey = 'Manufacturer' and M.DeviceId = i.DeviceId AND DATEDIFF(minute,i,M) BETWEEN 0 AND 0
Wynik to:
DeviceId | Lat | Długi | smVersion | smManufacturer |
---|---|---|---|---|
12345 | 47 | 122 | 1.2.45 | ABC |