Analýza dat JSON a Avro ve službě Azure Stream Analytics
Azure Stream Analytics podporuje zpracování událostí v datových formátech CSV, JSON a Avro. Data JSON i Avro mohou být strukturovaná a mohou obsahovat některé složité typy, jako jsou vnořené objekty (záznamy) a pole.
Poznámka
Soubory AVRO vytvořené službou Event Hub Capture používají konkrétní formát, který vyžaduje, abyste použili funkci vlastního deserializátoru . Další informace najdete v tématu Čtení vstupu v libovolném formátu pomocí vlastních deserializérů .NET.
Datové typy záznamů
Datové typy záznamů se používají k reprezentaci polí JSON a Avro, pokud se ve vstupních datových proudech používají odpovídající formáty. Tyto příklady ukazují ukázkový senzor, který čte vstupní události ve formátu JSON. Tady je příklad jedné události:
{
"DeviceId" : "12345",
"Location" :
{
"Lat": 47,
"Long": 122
},
"SensorReadings" :
{
"Temperature" : 80,
"Humidity" : 70,
"CustomSensor01" : 5,
"CustomSensor02" : 99,
"SensorMetadata" :
{
"Manufacturer":"ABC",
"Version":"1.2.45"
}
}
}
Přístup k vnořeným polím ve známém schématu
Pomocí tečkované notace (.) můžete snadno přistupovat k vnořeným polím přímo z dotazu. Tento dotaz například vybere souřadnice Zeměpisná šířka a Zeměpisná délka ve vlastnosti Location v předchozích datech JSON. Tečka se dá použít k navigaci na více úrovních, jak je znázorněno níže.
SELECT
DeviceID,
Location.Lat,
Location.Long,
SensorReadings.Temperature,
SensorReadings.SensorMetadata.Version
FROM input
Výsledek je:
Deviceid | Lat | Dlouhé | Teplota | Verze |
---|---|---|---|---|
12345 | 47 | 122 | 80 | 1.2.45 |
Vybrat všechny vlastnosti
Pomocí zástupného znaku *můžete vybrat všechny vlastnosti vnořeného záznamu. Uvažujte následující příklad:
SELECT
DeviceID,
Location.*
FROM input
Výsledek je:
Deviceid | Lat | Dlouhé |
---|---|---|
12345 | 47 | 122 |
Přístup k vnořeným polím, pokud je název vlastnosti proměnnou
Pokud je název vlastnosti proměnná, použijte funkci GetRecordPropertyValue . To umožňuje vytvářet dynamické dotazy bez pevných názvů vlastností.
Představte si například, že ukázkový datový proud musí být spojený s referenčními daty obsahujícími prahové hodnoty pro každý senzor zařízení. Fragment takových referenčních dat je zobrazen níže.
{
"DeviceId" : "12345",
"SensorName" : "Temperature",
"Value" : 85
},
{
"DeviceId" : "12345",
"SensorName" : "Humidity",
"Value" : 65
}
Cílem je spojit naši ukázkovou datovou sadu z horní části článku s referenčními daty a výstupem jedné události pro každou míru senzoru nad její prahovou hodnotou. To znamená, že naše jedna výše uvedená událost může generovat více výstupních událostí, pokud je více senzorů nad příslušnými prahovými hodnotami, díky spojení. Pokud chcete dosáhnout podobných výsledků bez spojení, přečtěte si následující část.
SELECT
input.DeviceID,
thresholds.SensorName,
"Alert: Sensor above threshold" AS AlertMessage
FROM input -- stream input
JOIN thresholds -- reference data input
ON
input.DeviceId = thresholds.DeviceId
WHERE
GetRecordPropertyValue(input.SensorReadings, thresholds.SensorName) > thresholds.Value
GetRecordPropertyValue vybere vlastnost v SensorReadings, která se shoduje s názvem vlastnosti pocházejícím z referenčních dat. Pak se extrahuje přidružená hodnota z SensorReadings .
Výsledek je:
Deviceid | Název senzoru | AlertMessage |
---|---|---|
12345 | Vlhkost | Upozornění: Senzor nad prahovou hodnotou |
Převod polí záznamu na samostatné události
Pokud chcete převést pole záznamů na samostatné události, použijte operátor APPLY společně s funkcí GetRecordProperties .
S původními ukázkovými daty je možné k extrakci vlastností do různých událostí použít následující dotaz.
SELECT
event.DeviceID,
sensorReading.PropertyName,
sensorReading.PropertyValue
FROM input as event
CROSS APPLY GetRecordProperties(event.SensorReadings) AS sensorReading
Výsledek je:
Deviceid | Název senzoru | AlertMessage |
---|---|---|
12345 | Teplota | 80 |
12345 | Vlhkost | 70 |
12345 | CustomSensor01 | 5 |
12345 | CustomSensor02 | 99 |
12345 | SensorMetadata | [object Object] |
Pomocí funkce WITH je pak možné tyto události směrovat do různých cílů:
WITH Stage0 AS
(
SELECT
event.DeviceID,
sensorReading.PropertyName,
sensorReading.PropertyValue
FROM input as event
CROSS APPLY GetRecordProperties(event.SensorReadings) AS sensorReading
)
SELECT DeviceID, PropertyValue AS Temperature INTO TemperatureOutput FROM Stage0 WHERE PropertyName = 'Temperature'
SELECT DeviceID, PropertyValue AS Humidity INTO HumidityOutput FROM Stage0 WHERE PropertyName = 'Humidity'
Parsovat záznam JSON v referenčních datech SQL
Při použití Azure SQL Database jako referenčních dat v úloze je možné mít sloupec, který obsahuje data ve formátu JSON. Příklad najdete níže.
Deviceid | Data |
---|---|
12345 | {"key": "value1"} |
54321 | {"key": "value2"} |
Záznam JSON ve sloupci Data můžete analyzovat zápisem jednoduché uživatelem definované funkce JavaScriptu.
function parseJson(string) {
return JSON.parse(string);
}
V dotazu Stream Analytics pak můžete vytvořit krok, jak je znázorněno níže, abyste měli přístup k polím vašich záznamů JSON.
WITH parseJson as
(
SELECT DeviceID, udf.parseJson(sqlRefInput.Data) as metadata,
FROM sqlRefInput
)
SELECT metadata.key
INTO output
FROM streamInput
JOIN parseJson
ON streamInput.DeviceID = parseJson.DeviceID
Datové typy pole
Datové typy pole jsou uspořádanou kolekcí hodnot. Některé typické operace s hodnotami polí jsou podrobně popsané níže. Tyto příklady používají funkce GetArrayElement, GetArrayElements, GetArrayLength a APPLY operátor.
Tady je příklad události. Oba typy CustomSensor03
a SensorMetadata
jsou typu matice:
{
"DeviceId" : "12345",
"SensorReadings" :
{
"Temperature" : 80,
"Humidity" : 70,
"CustomSensor01" : 5,
"CustomSensor02" : 99,
"CustomSensor03": [12,-5,0]
},
"SensorMetadata":[
{
"smKey":"Manufacturer",
"smValue":"ABC"
},
{
"smKey":"Version",
"smValue":"1.2.45"
}
]
}
Práce s konkrétním prvkem pole
Select array element at a specified index (select the first array element):
SELECT
GetArrayElement(SensorReadings.CustomSensor03, 0) AS firstElement
FROM input
Výsledek je:
firstElement |
---|
12 |
Výběr délky pole
SELECT
GetArrayLength(SensorReadings.CustomSensor03) AS arrayLength
FROM input
Výsledek je následující:
poleLength |
---|
3 |
Převod elementů pole na samostatné události
Vyberte všechny prvky pole jako jednotlivé události. Operátor APPLY spolu s předdefinované funkce GetArrayElements extrahuje všechny prvky pole jako jednotlivé události:
SELECT
DeviceId,
CustomSensor03Record.ArrayIndex,
CustomSensor03Record.ArrayValue
FROM input
CROSS APPLY GetArrayElements(SensorReadings.CustomSensor03) AS CustomSensor03Record
Výsledek je následující:
DeviceId | Arrayindex | Hodnota pole |
---|---|---|
12345 | 0 | 12 |
12345 | 1 | -5 |
12345 | 2 | 0 |
SELECT
i.DeviceId,
SensorMetadataRecords.ArrayValue.smKey as smKey,
SensorMetadataRecords.ArrayValue.smValue as smValue
FROM input i
CROSS APPLY GetArrayElements(SensorMetadata) AS SensorMetadataRecords
Výsledek je následující:
DeviceId | smKey | smValue |
---|---|---|
12345 | Manufacturer | ABC |
12345 | Verze | 1.2.45 |
Pokud se extrahovaná pole musí zobrazit ve sloupcích, je možné datovou sadu kromě operace JOIN převést pomocí syntaxe WITH. Toto spojení vyžaduje podmínku časového ohraničení , která zabraňuje duplikaci:
WITH DynamicCTE AS (
SELECT
i.DeviceId,
SensorMetadataRecords.ArrayValue.smKey as smKey,
SensorMetadataRecords.ArrayValue.smValue as smValue
FROM input i
CROSS APPLY GetArrayElements(SensorMetadata) AS SensorMetadataRecords
)
SELECT
i.DeviceId,
i.Location.*,
V.smValue AS 'smVersion',
M.smValue AS 'smManufacturer'
FROM input i
LEFT JOIN DynamicCTE V ON V.smKey = 'Version' and V.DeviceId = i.DeviceId AND DATEDIFF(minute,i,V) BETWEEN 0 AND 0
LEFT JOIN DynamicCTE M ON M.smKey = 'Manufacturer' and M.DeviceId = i.DeviceId AND DATEDIFF(minute,i,M) BETWEEN 0 AND 0
Výsledek je následující:
DeviceId | Lat | Dlouhé | smVersion | smManufacturer |
---|---|---|---|---|
12345 | 47 | 122 | 1.2.45 | ABC |