Sdílet prostřednictvím


Analýza dat JSON a Avro ve službě Azure Stream Analytics

Služba Azure Stream Analytics podporuje zpracování událostí ve formátech dat CSV, JSON a Avro. Data JSON i Avro můžou být strukturovaná a obsahují některé komplexní typy, jako jsou vnořené objekty (záznamy) a pole.

Poznámka:

Soubory AVRO vytvořené službou Event Hubs Capture používají konkrétní formát, který vyžaduje použití vlastní funkce deserializátoru . Další informace naleznete v tématu Čtení vstupu v libovolném formátu pomocí vlastních deserializerů .NET.

Záznam datových typů

Datové typy záznamů se používají k reprezentaci polí JSON a Avro, pokud se ve vstupních datových proudech používají odpovídající formáty. Tyto příklady ukazují ukázkový senzor, který čte vstupní události ve formátu JSON. Tady je příklad jedné události:

{
    "DeviceId" : "12345",
    "Location" :
    {
        "Lat": 47,
        "Long": 122
    },
    "SensorReadings" :
    {
        "Temperature" : 80,
        "Humidity" : 70,
        "CustomSensor01" : 5,
        "CustomSensor02" : 99,
        "SensorMetadata" : 
        {
        "Manufacturer":"ABC",
        "Version":"1.2.45"
        }
    }
}

Přístup k vnořeným polím ve známém schématu

Pomocí tečkovaného zápisu (.) můžete snadno získat přístup k vnořeným polím přímo z dotazu. Tento dotaz například vybere souřadnice zeměpisné šířky a délky ve vlastnosti Umístění v předchozích datech JSON. Zápis tečky lze použít k procházení více úrovní, jak je znázorněno v následujícím fragmentu kódu:

SELECT
    DeviceID,
    Location.Lat,
    Location.Long,
    SensorReadings.Temperature,
    SensorReadings.SensorMetadata.Version
FROM input

Výsledkem je:

ID zařízení Lat Dlouhé celé číslo Teplota Verze
12345 47 122 80 1.2.45

Vybrat všechny vlastnosti

Pomocí zástupného znaku *můžete vybrat všechny vlastnosti vnořeného záznamu. Představte si následující příklad:

SELECT
    DeviceID,
    Location.*
FROM input

Výsledkem je:

ID zařízení Lat Dlouhé celé číslo
12345 47 122

Přístup k vnořeným polím, pokud je název vlastnosti proměnnou

Pokud je název vlastnosti proměnnou, použijte funkci GetRecordPropertyValue. Umožňuje vytvářet dynamické dotazy bez pevně zakódovaných názvů vlastností.

Představte si například, že se ukázkový datový proud musí spojit s referenčními daty obsahujícími prahové hodnoty pro každý senzor zařízení. Fragment těchto referenčních dat se zobrazí v následujícím fragmentu kódu.

{
    "DeviceId" : "12345",
    "SensorName" : "Temperature",
    "Value" : 85
},
{
    "DeviceId" : "12345",
    "SensorName" : "Humidity",
    "Value" : 65
}

Cílem je spojit naši ukázkovou datovou sadu z horní části článku s referenčními daty a vytvořit výstup jedné události pro každou míru senzoru nad její prahovou hodnotu. To znamená, že naše jedna výše uvedená událost může generovat více výstupních událostí, pokud je více senzorů nad příslušnými prahovými hodnotami, a to díky spojení. Pokud chcete dosáhnout podobných výsledků bez spojení, podívejte se na následující příklad:

SELECT
    input.DeviceID,
    thresholds.SensorName,
    "Alert: Sensor above threshold" AS AlertMessage
FROM input      -- stream input
JOIN thresholds -- reference data input
ON
    input.DeviceId = thresholds.DeviceId
WHERE
    GetRecordPropertyValue(input.SensorReadings, thresholds.SensorName) > thresholds.Value

GetRecordPropertyValue vybere vlastnost v SensorReadings, který název odpovídá názvu vlastnosti pocházející z referenčních dat. Pak se extrahuje přidružená hodnota ze SensorReadings .

Výsledkem je:

ID zařízení SensorName AlertMessage
12345 Vlhkost Upozornění: Senzor nad prahovou hodnotou

Převod polí záznamů na samostatné události

Chcete-li převést pole záznamů na samostatné události, použijte operátor APPLY společně s funkcí GetRecordProperties .

S původními ukázkovými daty je možné použít následující dotaz k extrakci vlastností do různých událostí.

SELECT
    event.DeviceID,
    sensorReading.PropertyName,
    sensorReading.PropertyValue
FROM input as event
CROSS APPLY GetRecordProperties(event.SensorReadings) AS sensorReading

Výsledkem je:

ID zařízení SensorName AlertMessage
12345 Teplota 80
12345 Vlhkost 70
12345 CustomSensor01 5
12345 CustomSensor02 99
12345 SensorMetadata [object Object]

Pomocí funkce WITH je pak možné tyto události směrovat do různých cílů:

WITH Stage0 AS
(
    SELECT
        event.DeviceID,
        sensorReading.PropertyName,
        sensorReading.PropertyValue
    FROM input as event
    CROSS APPLY GetRecordProperties(event.SensorReadings) AS sensorReading
)

SELECT DeviceID, PropertyValue AS Temperature INTO TemperatureOutput FROM Stage0 WHERE PropertyName = 'Temperature'
SELECT DeviceID, PropertyValue AS Humidity INTO HumidityOutput FROM Stage0 WHERE PropertyName = 'Humidity'

Parsování záznamu JSON v referenčních datech SQL

Při použití Azure SQL Database jako referenčních dat ve vaší úloze je možné mít sloupec, který obsahuje data ve formátu JSON. Příklad je znázorněn v následujícím příkladu:

ID zařízení Data
12345 {"key": "value1"}
54321 {"key": "value2"}

Záznam JSON ve sloupci Data můžete analyzovat napsáním jednoduché uživatelem definované funkce JavaScriptu.

function parseJson(string) {
return JSON.parse(string);
}

Potom můžete vytvořit krok v dotazu Stream Analytics, jak je znázorněno tady, abyste měli přístup k polím záznamů JSON.

WITH parseJson as
(
SELECT DeviceID, udf.parseJson(sqlRefInput.Data) as metadata,
FROM sqlRefInput
)

SELECT metadata.key
INTO output
FROM streamInput
JOIN parseJson 
ON streamInput.DeviceID = parseJson.DeviceID

Datové typy pole

Datové typy pole jsou seřazenou kolekcí hodnot. Tady jsou podrobně popsány některé typické operace s hodnotami pole. Tyto příklady používají funkce GetArrayElement, GetArrayElements, GetArrayLength a APPLY operátor.

Tady je příklad události. Oba CustomSensor03 typy SensorMetadatapolí:

{
    "DeviceId" : "12345",
    "SensorReadings" :
    {
        "Temperature" : 80,
        "Humidity" : 70,
        "CustomSensor01" : 5,
        "CustomSensor02" : 99,
        "CustomSensor03": [12,-5,0]
     },
    "SensorMetadata":[
        {          
            "smKey":"Manufacturer",
            "smValue":"ABC"                
        },
        {
            "smKey":"Version",
            "smValue":"1.2.45"
        }
    ]
}

Práce s konkrétním prvkem pole

Výběr prvku pole v zadaném indexu (výběr prvního prvku pole):

SELECT
    GetArrayElement(SensorReadings.CustomSensor03, 0) AS firstElement
FROM input

Výsledkem je:

firstElement
12

Vybrat délku pole

SELECT
    GetArrayLength(SensorReadings.CustomSensor03) AS arrayLength
FROM input

Výsledkem je:

arrayLength
3

Převod prvků pole na samostatné události

Vyberte všechny prvky pole jako jednotlivé události. Operátor APPLY společně s integrovanou funkcí GetArrayElements extrahuje všechny prvky pole jako jednotlivé události:

SELECT
    DeviceId,
	CustomSensor03Record.ArrayIndex,
	CustomSensor03Record.ArrayValue
FROM input
CROSS APPLY GetArrayElements(SensorReadings.CustomSensor03) AS CustomSensor03Record

Výsledkem je:

DeviceId ArrayIndex ArrayValue
12345 0 12
12345 0 -5
12345 2 0
SELECT   
    i.DeviceId,	
    SensorMetadataRecords.ArrayValue.smKey as smKey,
    SensorMetadataRecords.ArrayValue.smValue as smValue
FROM input i
CROSS APPLY GetArrayElements(SensorMetadata) AS SensorMetadataRecords

Výsledkem je:

DeviceId smKey smValue
12345 Výrobce ABC
12345 Verze 1.2.45

Pokud se extrahovaná pole musí zobrazit ve sloupcích, je možné datovou sadu kromě operace JOIN převést pomocí syntaxe WITH. Toto spojení vyžaduje podmínku časové hranice , která brání duplikaci:

WITH DynamicCTE AS (
	SELECT   
		i.DeviceId,
		SensorMetadataRecords.ArrayValue.smKey as smKey,
		SensorMetadataRecords.ArrayValue.smValue as smValue
	FROM input i
	CROSS APPLY GetArrayElements(SensorMetadata) AS SensorMetadataRecords 
)

SELECT
	i.DeviceId,
	i.Location.*,
	V.smValue AS 'smVersion',
	M.smValue AS 'smManufacturer'
FROM input i
LEFT JOIN DynamicCTE V ON V.smKey = 'Version' and V.DeviceId = i.DeviceId AND DATEDIFF(minute,i,V) BETWEEN 0 AND 0 
LEFT JOIN DynamicCTE M ON M.smKey = 'Manufacturer' and M.DeviceId = i.DeviceId AND DATEDIFF(minute,i,M) BETWEEN 0 AND 0

Výsledkem je:

DeviceId Lat Dlouhé celé číslo smVersion smManufacturer
12345 47 122 1.2.45 ABC

Datové typy ve službě Azure Stream Analytics