Analýza dat JSON a Avro ve službě Azure Stream Analytics
Služba Azure Stream Analytics podporuje zpracování událostí ve formátech dat CSV, JSON a Avro. Data JSON i Avro můžou být strukturovaná a obsahují některé komplexní typy, jako jsou vnořené objekty (záznamy) a pole.
Poznámka:
Soubory AVRO vytvořené službou Event Hubs Capture používají konkrétní formát, který vyžaduje použití vlastní funkce deserializátoru . Další informace naleznete v tématu Čtení vstupu v libovolném formátu pomocí vlastních deserializerů .NET.
Záznam datových typů
Datové typy záznamů se používají k reprezentaci polí JSON a Avro, pokud se ve vstupních datových proudech používají odpovídající formáty. Tyto příklady ukazují ukázkový senzor, který čte vstupní události ve formátu JSON. Tady je příklad jedné události:
{
"DeviceId" : "12345",
"Location" :
{
"Lat": 47,
"Long": 122
},
"SensorReadings" :
{
"Temperature" : 80,
"Humidity" : 70,
"CustomSensor01" : 5,
"CustomSensor02" : 99,
"SensorMetadata" :
{
"Manufacturer":"ABC",
"Version":"1.2.45"
}
}
}
Přístup k vnořeným polím ve známém schématu
Pomocí tečkovaného zápisu (.) můžete snadno získat přístup k vnořeným polím přímo z dotazu. Tento dotaz například vybere souřadnice zeměpisné šířky a délky ve vlastnosti Umístění v předchozích datech JSON. Zápis tečky lze použít k procházení více úrovní, jak je znázorněno v následujícím fragmentu kódu:
SELECT
DeviceID,
Location.Lat,
Location.Long,
SensorReadings.Temperature,
SensorReadings.SensorMetadata.Version
FROM input
Výsledkem je:
ID zařízení | Lat | Dlouhé celé číslo | Teplota | Verze |
---|---|---|---|---|
12345 | 47 | 122 | 80 | 1.2.45 |
Vybrat všechny vlastnosti
Pomocí zástupného znaku *můžete vybrat všechny vlastnosti vnořeného záznamu. Představte si následující příklad:
SELECT
DeviceID,
Location.*
FROM input
Výsledkem je:
ID zařízení | Lat | Dlouhé celé číslo |
---|---|---|
12345 | 47 | 122 |
Přístup k vnořeným polím, pokud je název vlastnosti proměnnou
Pokud je název vlastnosti proměnnou, použijte funkci GetRecordPropertyValue. Umožňuje vytvářet dynamické dotazy bez pevně zakódovaných názvů vlastností.
Představte si například, že se ukázkový datový proud musí spojit s referenčními daty obsahujícími prahové hodnoty pro každý senzor zařízení. Fragment těchto referenčních dat se zobrazí v následujícím fragmentu kódu.
{
"DeviceId" : "12345",
"SensorName" : "Temperature",
"Value" : 85
},
{
"DeviceId" : "12345",
"SensorName" : "Humidity",
"Value" : 65
}
Cílem je spojit naši ukázkovou datovou sadu z horní části článku s referenčními daty a vytvořit výstup jedné události pro každou míru senzoru nad její prahovou hodnotu. To znamená, že naše jedna výše uvedená událost může generovat více výstupních událostí, pokud je více senzorů nad příslušnými prahovými hodnotami, a to díky spojení. Pokud chcete dosáhnout podobných výsledků bez spojení, podívejte se na následující příklad:
SELECT
input.DeviceID,
thresholds.SensorName,
"Alert: Sensor above threshold" AS AlertMessage
FROM input -- stream input
JOIN thresholds -- reference data input
ON
input.DeviceId = thresholds.DeviceId
WHERE
GetRecordPropertyValue(input.SensorReadings, thresholds.SensorName) > thresholds.Value
GetRecordPropertyValue vybere vlastnost v SensorReadings, který název odpovídá názvu vlastnosti pocházející z referenčních dat. Pak se extrahuje přidružená hodnota ze SensorReadings .
Výsledkem je:
ID zařízení | SensorName | AlertMessage |
---|---|---|
12345 | Vlhkost | Upozornění: Senzor nad prahovou hodnotou |
Převod polí záznamů na samostatné události
Chcete-li převést pole záznamů na samostatné události, použijte operátor APPLY společně s funkcí GetRecordProperties .
S původními ukázkovými daty je možné použít následující dotaz k extrakci vlastností do různých událostí.
SELECT
event.DeviceID,
sensorReading.PropertyName,
sensorReading.PropertyValue
FROM input as event
CROSS APPLY GetRecordProperties(event.SensorReadings) AS sensorReading
Výsledkem je:
ID zařízení | SensorName | AlertMessage |
---|---|---|
12345 | Teplota | 80 |
12345 | Vlhkost | 70 |
12345 | CustomSensor01 | 5 |
12345 | CustomSensor02 | 99 |
12345 | SensorMetadata | [object Object] |
Pomocí funkce WITH je pak možné tyto události směrovat do různých cílů:
WITH Stage0 AS
(
SELECT
event.DeviceID,
sensorReading.PropertyName,
sensorReading.PropertyValue
FROM input as event
CROSS APPLY GetRecordProperties(event.SensorReadings) AS sensorReading
)
SELECT DeviceID, PropertyValue AS Temperature INTO TemperatureOutput FROM Stage0 WHERE PropertyName = 'Temperature'
SELECT DeviceID, PropertyValue AS Humidity INTO HumidityOutput FROM Stage0 WHERE PropertyName = 'Humidity'
Parsování záznamu JSON v referenčních datech SQL
Při použití Azure SQL Database jako referenčních dat ve vaší úloze je možné mít sloupec, který obsahuje data ve formátu JSON. Příklad je znázorněn v následujícím příkladu:
ID zařízení | Data |
---|---|
12345 | {"key": "value1"} |
54321 | {"key": "value2"} |
Záznam JSON ve sloupci Data můžete analyzovat napsáním jednoduché uživatelem definované funkce JavaScriptu.
function parseJson(string) {
return JSON.parse(string);
}
Potom můžete vytvořit krok v dotazu Stream Analytics, jak je znázorněno tady, abyste měli přístup k polím záznamů JSON.
WITH parseJson as
(
SELECT DeviceID, udf.parseJson(sqlRefInput.Data) as metadata,
FROM sqlRefInput
)
SELECT metadata.key
INTO output
FROM streamInput
JOIN parseJson
ON streamInput.DeviceID = parseJson.DeviceID
Datové typy pole
Datové typy pole jsou seřazenou kolekcí hodnot. Tady jsou podrobně popsány některé typické operace s hodnotami pole. Tyto příklady používají funkce GetArrayElement, GetArrayElements, GetArrayLength a APPLY operátor.
Tady je příklad události. Oba CustomSensor03
typy SensorMetadata
polí:
{
"DeviceId" : "12345",
"SensorReadings" :
{
"Temperature" : 80,
"Humidity" : 70,
"CustomSensor01" : 5,
"CustomSensor02" : 99,
"CustomSensor03": [12,-5,0]
},
"SensorMetadata":[
{
"smKey":"Manufacturer",
"smValue":"ABC"
},
{
"smKey":"Version",
"smValue":"1.2.45"
}
]
}
Práce s konkrétním prvkem pole
Výběr prvku pole v zadaném indexu (výběr prvního prvku pole):
SELECT
GetArrayElement(SensorReadings.CustomSensor03, 0) AS firstElement
FROM input
Výsledkem je:
firstElement |
---|
12 |
Vybrat délku pole
SELECT
GetArrayLength(SensorReadings.CustomSensor03) AS arrayLength
FROM input
Výsledkem je:
arrayLength |
---|
3 |
Převod prvků pole na samostatné události
Vyberte všechny prvky pole jako jednotlivé události. Operátor APPLY společně s integrovanou funkcí GetArrayElements extrahuje všechny prvky pole jako jednotlivé události:
SELECT
DeviceId,
CustomSensor03Record.ArrayIndex,
CustomSensor03Record.ArrayValue
FROM input
CROSS APPLY GetArrayElements(SensorReadings.CustomSensor03) AS CustomSensor03Record
Výsledkem je:
DeviceId | ArrayIndex | ArrayValue |
---|---|---|
12345 | 0 | 12 |
12345 | 0 | -5 |
12345 | 2 | 0 |
SELECT
i.DeviceId,
SensorMetadataRecords.ArrayValue.smKey as smKey,
SensorMetadataRecords.ArrayValue.smValue as smValue
FROM input i
CROSS APPLY GetArrayElements(SensorMetadata) AS SensorMetadataRecords
Výsledkem je:
DeviceId | smKey | smValue |
---|---|---|
12345 | Výrobce | ABC |
12345 | Verze | 1.2.45 |
Pokud se extrahovaná pole musí zobrazit ve sloupcích, je možné datovou sadu kromě operace JOIN převést pomocí syntaxe WITH. Toto spojení vyžaduje podmínku časové hranice , která brání duplikaci:
WITH DynamicCTE AS (
SELECT
i.DeviceId,
SensorMetadataRecords.ArrayValue.smKey as smKey,
SensorMetadataRecords.ArrayValue.smValue as smValue
FROM input i
CROSS APPLY GetArrayElements(SensorMetadata) AS SensorMetadataRecords
)
SELECT
i.DeviceId,
i.Location.*,
V.smValue AS 'smVersion',
M.smValue AS 'smManufacturer'
FROM input i
LEFT JOIN DynamicCTE V ON V.smKey = 'Version' and V.DeviceId = i.DeviceId AND DATEDIFF(minute,i,V) BETWEEN 0 AND 0
LEFT JOIN DynamicCTE M ON M.smKey = 'Manufacturer' and M.DeviceId = i.DeviceId AND DATEDIFF(minute,i,M) BETWEEN 0 AND 0
Výsledkem je:
DeviceId | Lat | Dlouhé celé číslo | smVersion | smManufacturer |
---|---|---|---|---|
12345 | 47 | 122 | 1.2.45 | ABC |