Parsa JSON- och Avro-data i Azure Stream Analytics
Azure Stream Analytics stöder bearbetning av händelser i CSV-, JSON- och Avro-dataformat. Både JSON- och Avro-data kan struktureras och innehålla vissa komplexa typer, till exempel kapslade objekt (poster) och matriser.
Anteckning
AVRO-filer som skapats av Event Hub Capture använder ett visst format som kräver att du använder den anpassade deserialiserarfunktionen . Mer information finns i Läsa indata i valfritt format med anpassade .NET-deserialiserare.
Registrera datatyper
Postdatatyper används för att representera JSON- och Avro-matriser när motsvarande format används i indataströmmarna. De här exemplen visar en exempelsensor som läser indatahändelser i JSON-format. Här är ett exempel på en enskild händelse:
{
"DeviceId" : "12345",
"Location" :
{
"Lat": 47,
"Long": 122
},
"SensorReadings" :
{
"Temperature" : 80,
"Humidity" : 70,
"CustomSensor01" : 5,
"CustomSensor02" : 99,
"SensorMetadata" :
{
"Manufacturer":"ABC",
"Version":"1.2.45"
}
}
}
Få åtkomst till kapslade fält i ett känt schema
Använd punktnotation (.) för att enkelt komma åt kapslade fält direkt från din fråga. Den här frågan väljer till exempel koordinaterna Latitud och Longitud under egenskapen Plats i föregående JSON-data. Punktnotationen kan användas för att navigera i flera nivåer enligt nedan.
SELECT
DeviceID,
Location.Lat,
Location.Long,
SensorReadings.Temperature,
SensorReadings.SensorMetadata.Version
FROM input
Resultatet är:
Deviceid | Lat | Lång | Temperatur | Version |
---|---|---|---|---|
12345 | 47 | 122 | 80 | 1.2.45 |
Markera alla egenskaper
Du kan välja alla egenskaper för en kapslad post med jokertecknet *. Se följande exempel:
SELECT
DeviceID,
Location.*
FROM input
Resultatet är:
Deviceid | Lat | Lång |
---|---|---|
12345 | 47 | 122 |
Få åtkomst till kapslade fält när egenskapsnamnet är en variabel
Använd funktionen GetRecordPropertyValue om egenskapsnamnet är en variabel. På så sätt kan du skapa dynamiska frågor utan att hårdkoda egenskapsnamn.
Anta till exempel att exempeldataströmmen måste kopplas till referensdata som innehåller tröskelvärden för varje enhetssensor. Ett kodfragment med sådana referensdata visas nedan.
{
"DeviceId" : "12345",
"SensorName" : "Temperature",
"Value" : 85
},
{
"DeviceId" : "12345",
"SensorName" : "Humidity",
"Value" : 65
}
Målet här är att koppla vår exempeldatauppsättning överst i artikeln till dessa referensdata och mata ut en händelse för varje sensormått över tröskelvärdet. Det innebär att vår enda händelse ovan kan generera flera utdatahändelser om flera sensorer ligger över sina respektive tröskelvärden tack vare kopplingen. Om du vill uppnå liknande resultat utan koppling kan du läsa avsnittet nedan.
SELECT
input.DeviceID,
thresholds.SensorName,
"Alert: Sensor above threshold" AS AlertMessage
FROM input -- stream input
JOIN thresholds -- reference data input
ON
input.DeviceId = thresholds.DeviceId
WHERE
GetRecordPropertyValue(input.SensorReadings, thresholds.SensorName) > thresholds.Value
GetRecordPropertyValue väljer egenskapen i SensorReadings, vilket namn matchar egenskapsnamnet som kommer från referensdata. Sedan extraheras det associerade värdet från SensorReadings .
Resultatet är:
Deviceid | SensorName | AlertMessage |
---|---|---|
12345 | Luftfuktighet | Avisering: Sensorn överskrider tröskelvärdet |
Konvertera postfält till separata händelser
Om du vill konvertera postfält till separata händelser använder du apply-operatorn tillsammans med funktionen GetRecordProperties .
Med ursprungliga exempeldata kan följande fråga användas för att extrahera egenskaper till olika händelser.
SELECT
event.DeviceID,
sensorReading.PropertyName,
sensorReading.PropertyValue
FROM input as event
CROSS APPLY GetRecordProperties(event.SensorReadings) AS sensorReading
Resultatet är:
Deviceid | SensorName | AlertMessage |
---|---|---|
12345 | Temperatur | 80 |
12345 | Luftfuktighet | 70 |
12345 | CustomSensor01 | 5 |
12345 | CustomSensor02 | 99 |
12345 | SensorMetadata | [objektobjekt] |
Med hjälp av WITH kan du sedan dirigera dessa händelser till olika mål:
WITH Stage0 AS
(
SELECT
event.DeviceID,
sensorReading.PropertyName,
sensorReading.PropertyValue
FROM input as event
CROSS APPLY GetRecordProperties(event.SensorReadings) AS sensorReading
)
SELECT DeviceID, PropertyValue AS Temperature INTO TemperatureOutput FROM Stage0 WHERE PropertyName = 'Temperature'
SELECT DeviceID, PropertyValue AS Humidity INTO HumidityOutput FROM Stage0 WHERE PropertyName = 'Humidity'
Parsa JSON-post i SQL-referensdata
När du använder Azure SQL Database som referensdata i jobbet kan du ha en kolumn som har data i JSON-format. Ett exempel på detta visas nedan.
Deviceid | Data |
---|---|
12345 | {"key": "value1"} |
54321 | {"key": "value2"} |
Du kan parsa JSON-posten i kolumnen Data genom att skriva en enkel användardefinierad JavaScript-funktion.
function parseJson(string) {
return JSON.parse(string);
}
Du kan sedan skapa ett steg i Stream Analytics-frågan enligt nedan för att komma åt fälten i dina JSON-poster.
WITH parseJson as
(
SELECT DeviceID, udf.parseJson(sqlRefInput.Data) as metadata,
FROM sqlRefInput
)
SELECT metadata.key
INTO output
FROM streamInput
JOIN parseJson
ON streamInput.DeviceID = parseJson.DeviceID
Matrisdatatyper
Matrisdatatyper är en ordnad samling värden. Några typiska åtgärder för matrisvärden beskrivs nedan. I de här exemplen används funktionerna GetArrayElement, GetArrayElements, GetArrayLength och APPLY-operatorn .
Här är ett exempel på en händelse. Både CustomSensor03
och SensorMetadata
är av typen matris:
{
"DeviceId" : "12345",
"SensorReadings" :
{
"Temperature" : 80,
"Humidity" : 70,
"CustomSensor01" : 5,
"CustomSensor02" : 99,
"CustomSensor03": [12,-5,0]
},
"SensorMetadata":[
{
"smKey":"Manufacturer",
"smValue":"ABC"
},
{
"smKey":"Version",
"smValue":"1.2.45"
}
]
}
Arbeta med ett specifikt matriselement
Välj matriselement vid ett angivet index (välj det första matriselementet):
SELECT
GetArrayElement(SensorReadings.CustomSensor03, 0) AS firstElement
FROM input
Resultatet är:
firstElement |
---|
12 |
Välj matrislängd
SELECT
GetArrayLength(SensorReadings.CustomSensor03) AS arrayLength
FROM input
Resultatet är:
arrayLength |
---|
3 |
Konvertera matriselement till separata händelser
Markera alla matriselement som enskilda händelser. APPLY-operatorn tillsammans med den inbyggda funktionen GetArrayElements extraherar alla matriselement som enskilda händelser:
SELECT
DeviceId,
CustomSensor03Record.ArrayIndex,
CustomSensor03Record.ArrayValue
FROM input
CROSS APPLY GetArrayElements(SensorReadings.CustomSensor03) AS CustomSensor03Record
Resultatet är:
DeviceId | ArrayIndex | ArrayValue |
---|---|---|
12345 | 0 | 12 |
12345 | 1 | -5 |
12345 | 2 | 0 |
SELECT
i.DeviceId,
SensorMetadataRecords.ArrayValue.smKey as smKey,
SensorMetadataRecords.ArrayValue.smValue as smValue
FROM input i
CROSS APPLY GetArrayElements(SensorMetadata) AS SensorMetadataRecords
Resultatet är:
DeviceId | smKey | smValue |
---|---|---|
12345 | Tillverkare | ABC |
12345 | Version | 1.2.45 |
Om de extraherade fälten måste visas i kolumner är det möjligt att pivoteras datauppsättningen med hjälp av WITH-syntaxen utöver JOIN-åtgärden . Den kopplingen kräver ett tidsgränsvillkor som förhindrar duplicering:
WITH DynamicCTE AS (
SELECT
i.DeviceId,
SensorMetadataRecords.ArrayValue.smKey as smKey,
SensorMetadataRecords.ArrayValue.smValue as smValue
FROM input i
CROSS APPLY GetArrayElements(SensorMetadata) AS SensorMetadataRecords
)
SELECT
i.DeviceId,
i.Location.*,
V.smValue AS 'smVersion',
M.smValue AS 'smManufacturer'
FROM input i
LEFT JOIN DynamicCTE V ON V.smKey = 'Version' and V.DeviceId = i.DeviceId AND DATEDIFF(minute,i,V) BETWEEN 0 AND 0
LEFT JOIN DynamicCTE M ON M.smKey = 'Manufacturer' and M.DeviceId = i.DeviceId AND DATEDIFF(minute,i,M) BETWEEN 0 AND 0
Resultatet är:
DeviceId | Lat | Lång | smVersion | smManufacturer |
---|---|---|---|---|
12345 | 47 | 122 | 1.2.45 | ABC |