Sdílet prostřednictvím


Analýza dat JSON a Avro ve službě Azure Stream Analytics

Azure Stream Analytics podporuje zpracování událostí v datových formátech CSV, JSON a Avro. Data JSON i Avro mohou být strukturovaná a mohou obsahovat některé složité typy, jako jsou vnořené objekty (záznamy) a pole.

Poznámka

Soubory AVRO vytvořené službou Event Hub Capture používají konkrétní formát, který vyžaduje, abyste použili funkci vlastního deserializátoru . Další informace najdete v tématu Čtení vstupu v libovolném formátu pomocí vlastních deserializérů .NET.

Datové typy záznamů

Datové typy záznamů se používají k reprezentaci polí JSON a Avro, pokud se ve vstupních datových proudech používají odpovídající formáty. Tyto příklady ukazují ukázkový senzor, který čte vstupní události ve formátu JSON. Tady je příklad jedné události:

{
    "DeviceId" : "12345",
    "Location" :
    {
        "Lat": 47,
        "Long": 122
    },
    "SensorReadings" :
    {
        "Temperature" : 80,
        "Humidity" : 70,
        "CustomSensor01" : 5,
        "CustomSensor02" : 99,
        "SensorMetadata" : 
        {
        "Manufacturer":"ABC",
        "Version":"1.2.45"
        }
    }
}

Přístup k vnořeným polím ve známém schématu

Pomocí tečkované notace (.) můžete snadno přistupovat k vnořeným polím přímo z dotazu. Tento dotaz například vybere souřadnice Zeměpisná šířka a Zeměpisná délka ve vlastnosti Location v předchozích datech JSON. Tečka se dá použít k navigaci na více úrovních, jak je znázorněno níže.

SELECT
    DeviceID,
    Location.Lat,
    Location.Long,
    SensorReadings.Temperature,
    SensorReadings.SensorMetadata.Version
FROM input

Výsledek je:

Deviceid Lat Dlouhé Teplota Verze
12345 47 122 80 1.2.45

Vybrat všechny vlastnosti

Pomocí zástupného znaku *můžete vybrat všechny vlastnosti vnořeného záznamu. Uvažujte následující příklad:

SELECT
    DeviceID,
    Location.*
FROM input

Výsledek je:

Deviceid Lat Dlouhé
12345 47 122

Přístup k vnořeným polím, pokud je název vlastnosti proměnnou

Pokud je název vlastnosti proměnná, použijte funkci GetRecordPropertyValue . To umožňuje vytvářet dynamické dotazy bez pevných názvů vlastností.

Představte si například, že ukázkový datový proud musí být spojený s referenčními daty obsahujícími prahové hodnoty pro každý senzor zařízení. Fragment takových referenčních dat je zobrazen níže.

{
    "DeviceId" : "12345",
    "SensorName" : "Temperature",
    "Value" : 85
},
{
    "DeviceId" : "12345",
    "SensorName" : "Humidity",
    "Value" : 65
}

Cílem je spojit naši ukázkovou datovou sadu z horní části článku s referenčními daty a výstupem jedné události pro každou míru senzoru nad její prahovou hodnotou. To znamená, že naše jedna výše uvedená událost může generovat více výstupních událostí, pokud je více senzorů nad příslušnými prahovými hodnotami, díky spojení. Pokud chcete dosáhnout podobných výsledků bez spojení, přečtěte si následující část.

SELECT
    input.DeviceID,
    thresholds.SensorName,
    "Alert: Sensor above threshold" AS AlertMessage
FROM input      -- stream input
JOIN thresholds -- reference data input
ON
    input.DeviceId = thresholds.DeviceId
WHERE
    GetRecordPropertyValue(input.SensorReadings, thresholds.SensorName) > thresholds.Value

GetRecordPropertyValue vybere vlastnost v SensorReadings, která se shoduje s názvem vlastnosti pocházejícím z referenčních dat. Pak se extrahuje přidružená hodnota z SensorReadings .

Výsledek je:

Deviceid Název senzoru AlertMessage
12345 Vlhkost Upozornění: Senzor nad prahovou hodnotou

Převod polí záznamu na samostatné události

Pokud chcete převést pole záznamů na samostatné události, použijte operátor APPLY společně s funkcí GetRecordProperties .

S původními ukázkovými daty je možné k extrakci vlastností do různých událostí použít následující dotaz.

SELECT
    event.DeviceID,
    sensorReading.PropertyName,
    sensorReading.PropertyValue
FROM input as event
CROSS APPLY GetRecordProperties(event.SensorReadings) AS sensorReading

Výsledek je:

Deviceid Název senzoru AlertMessage
12345 Teplota 80
12345 Vlhkost 70
12345 CustomSensor01 5
12345 CustomSensor02 99
12345 SensorMetadata [object Object]

Pomocí funkce WITH je pak možné tyto události směrovat do různých cílů:

WITH Stage0 AS
(
    SELECT
        event.DeviceID,
        sensorReading.PropertyName,
        sensorReading.PropertyValue
    FROM input as event
    CROSS APPLY GetRecordProperties(event.SensorReadings) AS sensorReading
)

SELECT DeviceID, PropertyValue AS Temperature INTO TemperatureOutput FROM Stage0 WHERE PropertyName = 'Temperature'
SELECT DeviceID, PropertyValue AS Humidity INTO HumidityOutput FROM Stage0 WHERE PropertyName = 'Humidity'

Parsovat záznam JSON v referenčních datech SQL

Při použití Azure SQL Database jako referenčních dat v úloze je možné mít sloupec, který obsahuje data ve formátu JSON. Příklad najdete níže.

Deviceid Data
12345 {"key": "value1"}
54321 {"key": "value2"}

Záznam JSON ve sloupci Data můžete analyzovat zápisem jednoduché uživatelem definované funkce JavaScriptu.

function parseJson(string) {
return JSON.parse(string);
}

V dotazu Stream Analytics pak můžete vytvořit krok, jak je znázorněno níže, abyste měli přístup k polím vašich záznamů JSON.

WITH parseJson as
(
SELECT DeviceID, udf.parseJson(sqlRefInput.Data) as metadata,
FROM sqlRefInput
)

SELECT metadata.key
INTO output
FROM streamInput
JOIN parseJson 
ON streamInput.DeviceID = parseJson.DeviceID

Datové typy pole

Datové typy pole jsou uspořádanou kolekcí hodnot. Některé typické operace s hodnotami polí jsou podrobně popsané níže. Tyto příklady používají funkce GetArrayElement, GetArrayElements, GetArrayLength a APPLY operátor.

Tady je příklad události. Oba typy CustomSensor03 a SensorMetadata jsou typu matice:

{
    "DeviceId" : "12345",
    "SensorReadings" :
    {
        "Temperature" : 80,
        "Humidity" : 70,
        "CustomSensor01" : 5,
        "CustomSensor02" : 99,
        "CustomSensor03": [12,-5,0]
     },
    "SensorMetadata":[
        {          
            "smKey":"Manufacturer",
            "smValue":"ABC"                
        },
        {
            "smKey":"Version",
            "smValue":"1.2.45"
        }
    ]
}

Práce s konkrétním prvkem pole

Select array element at a specified index (select the first array element):

SELECT
    GetArrayElement(SensorReadings.CustomSensor03, 0) AS firstElement
FROM input

Výsledek je:

firstElement
12

Výběr délky pole

SELECT
    GetArrayLength(SensorReadings.CustomSensor03) AS arrayLength
FROM input

Výsledek je následující:

poleLength
3

Převod elementů pole na samostatné události

Vyberte všechny prvky pole jako jednotlivé události. Operátor APPLY spolu s předdefinované funkce GetArrayElements extrahuje všechny prvky pole jako jednotlivé události:

SELECT
    DeviceId,
	CustomSensor03Record.ArrayIndex,
	CustomSensor03Record.ArrayValue
FROM input
CROSS APPLY GetArrayElements(SensorReadings.CustomSensor03) AS CustomSensor03Record

Výsledek je následující:

DeviceId Arrayindex Hodnota pole
12345 0 12
12345 1 -5
12345 2 0
SELECT   
    i.DeviceId,	
    SensorMetadataRecords.ArrayValue.smKey as smKey,
    SensorMetadataRecords.ArrayValue.smValue as smValue
FROM input i
CROSS APPLY GetArrayElements(SensorMetadata) AS SensorMetadataRecords

Výsledek je následující:

DeviceId smKey smValue
12345 Manufacturer ABC
12345 Verze 1.2.45

Pokud se extrahovaná pole musí zobrazit ve sloupcích, je možné datovou sadu kromě operace JOIN převést pomocí syntaxe WITH. Toto spojení vyžaduje podmínku časového ohraničení , která zabraňuje duplikaci:

WITH DynamicCTE AS (
	SELECT   
		i.DeviceId,
		SensorMetadataRecords.ArrayValue.smKey as smKey,
		SensorMetadataRecords.ArrayValue.smValue as smValue
	FROM input i
	CROSS APPLY GetArrayElements(SensorMetadata) AS SensorMetadataRecords 
)

SELECT
	i.DeviceId,
	i.Location.*,
	V.smValue AS 'smVersion',
	M.smValue AS 'smManufacturer'
FROM input i
LEFT JOIN DynamicCTE V ON V.smKey = 'Version' and V.DeviceId = i.DeviceId AND DATEDIFF(minute,i,V) BETWEEN 0 AND 0 
LEFT JOIN DynamicCTE M ON M.smKey = 'Manufacturer' and M.DeviceId = i.DeviceId AND DATEDIFF(minute,i,M) BETWEEN 0 AND 0

Výsledek je následující:

DeviceId Lat Dlouhé smVersion smManufacturer
12345 47 122 1.2.45 ABC

Viz také

Datové typy v Azure Stream Analytics