Delen via


JSON- en Avro-gegevens parseren in Azure Stream Analytics

De Azure Stream Analytics-service ondersteunt het verwerken van gebeurtenissen in CSV-, JSON- en Avro-gegevensindelingen. Zowel JSON- als Avro-gegevens kunnen worden gestructureerd en bevatten enkele complexe typen, zoals geneste objecten (records) en matrices.

Notitie

AVRO-bestanden die zijn gemaakt door Event Hubs Capture gebruiken een specifieke indeling waarvoor u de aangepaste deserializer-functie moet gebruiken. Zie Invoer lezen in elke indeling met aangepaste .NET-deserializers voor meer informatie.

Gegevenstypen vastleggen

Recordgegevenstypen worden gebruikt om JSON- en Avro-matrices weer te geven wanneer bijbehorende indelingen worden gebruikt in de invoergegevensstromen. Deze voorbeelden laten een voorbeeldsensor zien, die invoergebeurtenissen leest in JSON-indeling. Hier volgt een voorbeeld van één gebeurtenis:

{
    "DeviceId" : "12345",
    "Location" :
    {
        "Lat": 47,
        "Long": 122
    },
    "SensorReadings" :
    {
        "Temperature" : 80,
        "Humidity" : 70,
        "CustomSensor01" : 5,
        "CustomSensor02" : 99,
        "SensorMetadata" : 
        {
        "Manufacturer":"ABC",
        "Version":"1.2.45"
        }
    }
}

Geneste velden in een bekend schema openen

Gebruik punt notatie (.) om eenvoudig geneste velden rechtstreeks vanuit uw query te openen. Met deze query worden bijvoorbeeld de coördinaten voor breedtegraad en lengtegraad geselecteerd onder de eigenschap Locatie in de voorgaande JSON-gegevens. De puntnotatie kan worden gebruikt om door meerdere niveaus te navigeren, zoals wordt weergegeven in het volgende fragment:

SELECT
    DeviceID,
    Location.Lat,
    Location.Long,
    SensorReadings.Temperature,
    SensorReadings.SensorMetadata.Version
FROM input

Het resultaat is:

DeviceID Lat Lang Temperatuur Versie
12345 47 122 80 1.2.45

Alle eigenschappen selecteren

U kunt alle eigenschappen van een geneste record selecteren met behulp van het jokerteken '*'. Kijk een naar het volgende voorbeeld:

SELECT
    DeviceID,
    Location.*
FROM input

Het resultaat is:

DeviceID Lat Lang
12345 47 122

Geneste velden openen wanneer eigenschapsnaam een variabele is

Gebruik de functie GetRecordPropertyValue als de eigenschapsnaam een variabele is. Hiermee kunt u dynamische query's maken zonder de namen van eigenschappen van hardcodering.

Stel dat de voorbeeldgegevensstroom moet worden gekoppeld aan referentiegegevens met drempelwaarden voor elke apparaatsensor. Een fragment van dergelijke referentiegegevens wordt weergegeven in het volgende fragment.

{
    "DeviceId" : "12345",
    "SensorName" : "Temperature",
    "Value" : 85
},
{
    "DeviceId" : "12345",
    "SensorName" : "Humidity",
    "Value" : 65
}

Het doel is om onze voorbeeldgegevensset boven aan het artikel toe te voegen aan die referentiegegevens en één gebeurtenis uit te voeren voor elke sensormeting boven de drempelwaarde. Dat betekent dat onze bovenstaande gebeurtenis meerdere uitvoergebeurtenissen kan genereren als meerdere sensoren boven hun respectieve drempelwaarden liggen, dankzij de join. Zie het volgende voorbeeld om vergelijkbare resultaten zonder join te bereiken:

SELECT
    input.DeviceID,
    thresholds.SensorName,
    "Alert: Sensor above threshold" AS AlertMessage
FROM input      -- stream input
JOIN thresholds -- reference data input
ON
    input.DeviceId = thresholds.DeviceId
WHERE
    GetRecordPropertyValue(input.SensorReadings, thresholds.SensorName) > thresholds.Value

GetRecordPropertyValue selecteert de eigenschap in SensorReadings, die overeenkomt met de naam van de eigenschap die afkomstig is van de referentiegegevens. Vervolgens wordt de bijbehorende waarde uit SensorReadings geëxtraheerd.

Het resultaat is:

DeviceID SensorName AlertMessage
12345 Vochtigheid Waarschuwing: Sensor boven drempelwaarde

Recordvelden converteren naar afzonderlijke gebeurtenissen

Als u recordvelden wilt converteren naar afzonderlijke gebeurtenissen, gebruikt u de operator APPLY samen met de functie GetRecordProperties .

Met de oorspronkelijke voorbeeldgegevens kan de volgende query worden gebruikt om eigenschappen in verschillende gebeurtenissen te extraheren.

SELECT
    event.DeviceID,
    sensorReading.PropertyName,
    sensorReading.PropertyValue
FROM input as event
CROSS APPLY GetRecordProperties(event.SensorReadings) AS sensorReading

Het resultaat is:

DeviceID SensorName AlertMessage
12345 Temperatuur 80
12345 Vochtigheid 70
12345 CustomSensor01 5
12345 CustomSensor02 99
12345 SensorMetadata [objectobject]

Met BEHULP van WITH is het vervolgens mogelijk om deze gebeurtenissen naar verschillende bestemmingen te routeren:

WITH Stage0 AS
(
    SELECT
        event.DeviceID,
        sensorReading.PropertyName,
        sensorReading.PropertyValue
    FROM input as event
    CROSS APPLY GetRecordProperties(event.SensorReadings) AS sensorReading
)

SELECT DeviceID, PropertyValue AS Temperature INTO TemperatureOutput FROM Stage0 WHERE PropertyName = 'Temperature'
SELECT DeviceID, PropertyValue AS Humidity INTO HumidityOutput FROM Stage0 WHERE PropertyName = 'Humidity'

JSON-record parseren in SQL-referentiegegevens

Wanneer u Azure SQL Database gebruikt als referentiegegevens in uw taak, is het mogelijk om een kolom met gegevens in JSON-indeling te hebben. In het volgende voorbeeld wordt een voorbeeld weergegeven:

DeviceID Gegevens
12345 {"key": "value1"}
54321 {"key": "value2"}

U kunt de JSON-record in de kolom Gegevens parseren door een eenvoudige door de gebruiker gedefinieerde JavaScript-functie te schrijven.

function parseJson(string) {
return JSON.parse(string);
}

U kunt vervolgens een stap maken in uw Stream Analytics-query, zoals hier wordt weergegeven voor toegang tot de velden van uw JSON-records.

WITH parseJson as
(
SELECT DeviceID, udf.parseJson(sqlRefInput.Data) as metadata,
FROM sqlRefInput
)

SELECT metadata.key
INTO output
FROM streamInput
JOIN parseJson 
ON streamInput.DeviceID = parseJson.DeviceID

Matrixgegevenstypen

Matrixgegevenstypen zijn een geordende verzameling waarden. Hier worden enkele typische bewerkingen voor matrixwaarden beschreven. In deze voorbeelden worden de functies GetArrayElement, GetArrayElements, GetArrayLength en de operator APPLY gebruikt.

Hier volgt een voorbeeld van een gebeurtenis. Beide CustomSensor03 en SensorMetadata zijn van het type matrix:

{
    "DeviceId" : "12345",
    "SensorReadings" :
    {
        "Temperature" : 80,
        "Humidity" : 70,
        "CustomSensor01" : 5,
        "CustomSensor02" : 99,
        "CustomSensor03": [12,-5,0]
     },
    "SensorMetadata":[
        {          
            "smKey":"Manufacturer",
            "smValue":"ABC"                
        },
        {
            "smKey":"Version",
            "smValue":"1.2.45"
        }
    ]
}

Werken met een specifiek matrixelement

Matrixelement selecteren in een opgegeven index (het eerste matrixelement selecteren):

SELECT
    GetArrayElement(SensorReadings.CustomSensor03, 0) AS firstElement
FROM input

Het resultaat is:

firstElement
12

Matrixlengte selecteren

SELECT
    GetArrayLength(SensorReadings.CustomSensor03) AS arrayLength
FROM input

Het resultaat is:

arrayLength
3

Matrixelementen converteren naar afzonderlijke gebeurtenissen

Selecteer alle matrixelementen als afzonderlijke gebeurtenissen. De operator APPLY samen met de ingebouwde functie GetArrayElements extraheert alle matrixelementen als afzonderlijke gebeurtenissen:

SELECT
    DeviceId,
	CustomSensor03Record.ArrayIndex,
	CustomSensor03Record.ArrayValue
FROM input
CROSS APPLY GetArrayElements(SensorReadings.CustomSensor03) AS CustomSensor03Record

Het resultaat is:

DeviceId ArrayIndex ArrayValue
12345 0 12
12345 1 -5
12345 2 0
SELECT   
    i.DeviceId,	
    SensorMetadataRecords.ArrayValue.smKey as smKey,
    SensorMetadataRecords.ArrayValue.smValue as smValue
FROM input i
CROSS APPLY GetArrayElements(SensorMetadata) AS SensorMetadataRecords

Het resultaat is:

DeviceId smKey smValue
12345 Fabrikant ABC
12345 Versie 1.2.45

Als de geëxtraheerde velden in kolommen moeten worden weergegeven, is het mogelijk om de gegevensset te draaien met behulp van de WITH-syntaxis naast de JOIN-bewerking . Voor die join is een voorwaarde voor tijdgrens vereist waarmee duplicatie wordt voorkomen:

WITH DynamicCTE AS (
	SELECT   
		i.DeviceId,
		SensorMetadataRecords.ArrayValue.smKey as smKey,
		SensorMetadataRecords.ArrayValue.smValue as smValue
	FROM input i
	CROSS APPLY GetArrayElements(SensorMetadata) AS SensorMetadataRecords 
)

SELECT
	i.DeviceId,
	i.Location.*,
	V.smValue AS 'smVersion',
	M.smValue AS 'smManufacturer'
FROM input i
LEFT JOIN DynamicCTE V ON V.smKey = 'Version' and V.DeviceId = i.DeviceId AND DATEDIFF(minute,i,V) BETWEEN 0 AND 0 
LEFT JOIN DynamicCTE M ON M.smKey = 'Manufacturer' and M.DeviceId = i.DeviceId AND DATEDIFF(minute,i,M) BETWEEN 0 AND 0

Het resultaat is:

DeviceId Lat Lang smVersion smManufacturer
12345 47 122 1.2.45 ABC

Gegevenstypen in Azure Stream Analytics