Analizzare dati JSON e Avro in Analisi di flusso di Azure
Il servizio Analisi di flusso di Azure supporta l'elaborazione di eventi in formati di dati CSV, JSON e Avro. Entrambi i dati JSON e Avro possono essere strutturati e possono contenere alcuni tipi complessi come oggetti annidati (record) e matrici.
Nota
I file AVRO creati da Acquisizione di Hub eventi usano un formato specifico che richiede l'uso della funzionalità deserializzatore personalizzata. Per altre informazioni, vedere Leggere input in qualsiasi formato tramite deserializzatori personalizzati .NET.
Tipi di dati record
I tipi di dati record vengono usati per rappresentare le matrici JSON e Avro quando vengono usati formati corrispondenti nei flussi di dati di input. Questi esempi illustrano un sensore di esempio che legge gli eventi di input in formato JSON. Di seguito è riportato un esempio di un singolo evento:
{
"DeviceId" : "12345",
"Location" :
{
"Lat": 47,
"Long": 122
},
"SensorReadings" :
{
"Temperature" : 80,
"Humidity" : 70,
"CustomSensor01" : 5,
"CustomSensor02" : 99,
"SensorMetadata" :
{
"Manufacturer":"ABC",
"Version":"1.2.45"
}
}
}
Accedere ai campi annidati nello schema noto
Usare la notazione con il punto (.) per accedere facilmente ai campi annidati direttamente dalla query. Ad esempio, questa query seleziona le coordinate di latitudine e longitudine nella proprietà Location dei dati JSON precedenti. La notazione del punto può essere usata per spostarsi tra più livelli, come illustrato nel frammento di codice seguente:
SELECT
DeviceID,
Location.Lat,
Location.Long,
SensorReadings.Temperature,
SensorReadings.SensorMetadata.Version
FROM input
Il risultato è:
DeviceID | Lat | Lungo | Temperatura | Versione |
---|---|---|---|---|
12345 | 47 | 122 | 80 | 1.2.45 |
Selezionare tutte le proprietà
È possibile selezionare tutte le proprietà di un record annidato con il carattere jolly asterisco (*). Si consideri l'esempio seguente:
SELECT
DeviceID,
Location.*
FROM input
Il risultato è:
DeviceID | Lat | Lungo |
---|---|---|
12345 | 47 | 122 |
Accedere ai campi annidati quando il nome della proprietà è una variabile
Usare la funzione GetRecordPropertyValue se il nome della proprietà è una variabile. Consente di creare query dinamiche senza nomi di proprietà hardcoding.
Si supponga che un flusso di dati di esempio debba essere unito tramite join a dati di riferimento contenenti soglie per ogni sensore. Un frammento di tali dati di riferimento è illustrato nel frammento di codice seguente.
{
"DeviceId" : "12345",
"SensorName" : "Temperature",
"Value" : 85
},
{
"DeviceId" : "12345",
"SensorName" : "Humidity",
"Value" : 65
}
L'obiettivo è unire il set di dati di esempio dalla parte superiore dell'articolo ai dati di riferimento e restituire un evento per ogni misura del sensore al di sopra della soglia. Questo significa che il singolo evento precedente può generare più eventi di output se più sensori superano le rispettive soglie, grazie al join. Per ottenere risultati simili senza un join, vedere l'esempio seguente:
SELECT
input.DeviceID,
thresholds.SensorName,
"Alert: Sensor above threshold" AS AlertMessage
FROM input -- stream input
JOIN thresholds -- reference data input
ON
input.DeviceId = thresholds.DeviceId
WHERE
GetRecordPropertyValue(input.SensorReadings, thresholds.SensorName) > thresholds.Value
GetRecordPropertyValue seleziona la proprietà in SensorReadings, il cui nome corrisponde al nome della proprietà proveniente dai dati di riferimento. Quindi viene estratto il valore associato da SensorReadings.
Il risultato è:
DeviceID | SensorName | AlertMessage |
---|---|---|
12345 | Umidità | Avviso: sensore al di sopra della soglia |
Convertire i campi dei record in eventi distinti
Per convertire i campi di record in eventi separati, usare l'operatore APPLY insieme alla funzione GetRecordProperties.
Con i dati di esempio originali, è possibile usare la query seguente per estrarre le proprietà in eventi diversi.
SELECT
event.DeviceID,
sensorReading.PropertyName,
sensorReading.PropertyValue
FROM input as event
CROSS APPLY GetRecordProperties(event.SensorReadings) AS sensorReading
Il risultato è:
DeviceID | SensorName | AlertMessage |
---|---|---|
12345 | Temperatura | 80 |
12345 | Umidità | 70 |
12345 | CustomSensor01 | 5 |
12345 | CustomSensor02 | 99 |
12345 | SensorMetadata | [object Object] |
Usando WITH, è quindi possibile instradare tali eventi a destinazioni diverse:
WITH Stage0 AS
(
SELECT
event.DeviceID,
sensorReading.PropertyName,
sensorReading.PropertyValue
FROM input as event
CROSS APPLY GetRecordProperties(event.SensorReadings) AS sensorReading
)
SELECT DeviceID, PropertyValue AS Temperature INTO TemperatureOutput FROM Stage0 WHERE PropertyName = 'Temperature'
SELECT DeviceID, PropertyValue AS Humidity INTO HumidityOutput FROM Stage0 WHERE PropertyName = 'Humidity'
Analizzare il record JSON nei dati di riferimento SQL
Quando si usa Database SQL di Azure come dati di riferimento nel processo, è possibile che una colonna contenga dati in formato JSON. Un esempio è illustrato nell'esempio seguente:
DeviceID | Dati |
---|---|
12345 | {"key": "value1"} |
54321 | {"key": "value2"} |
È possibile analizzare il record JSON nella colonna Data scrivendo una semplice funzione JavaScript definita dall'utente.
function parseJson(string) {
return JSON.parse(string);
}
È quindi possibile creare un passaggio nella query di Analisi di flusso, come illustrato di seguito per accedere ai campi dei record JSON.
WITH parseJson as
(
SELECT DeviceID, udf.parseJson(sqlRefInput.Data) as metadata,
FROM sqlRefInput
)
SELECT metadata.key
INTO output
FROM streamInput
JOIN parseJson
ON streamInput.DeviceID = parseJson.DeviceID
Tipi di dati matrice
I tipi di dati matrice sono una raccolta ordinata di valori. Alcune operazioni tipiche sui valori di matrice sono descritte qui. Questi esempi usano le funzioni GetArrayElement, GetArrayElements, GetArrayLength e l'operatore APPLY.
Ecco un esempio di evento. Sia CustomSensor03
che SensorMetadata
sono di tipo matrice:
{
"DeviceId" : "12345",
"SensorReadings" :
{
"Temperature" : 80,
"Humidity" : 70,
"CustomSensor01" : 5,
"CustomSensor02" : 99,
"CustomSensor03": [12,-5,0]
},
"SensorMetadata":[
{
"smKey":"Manufacturer",
"smValue":"ABC"
},
{
"smKey":"Version",
"smValue":"1.2.45"
}
]
}
Uso di un elemento di matrice specifico
Selezionare un elemento della matrice in corrispondenza dell'indice specificato (selezione del primo elemento della matrice):
SELECT
GetArrayElement(SensorReadings.CustomSensor03, 0) AS firstElement
FROM input
Il risultato è:
firstElement |
---|
12 |
Selezionare la lunghezza della matrice
SELECT
GetArrayLength(SensorReadings.CustomSensor03) AS arrayLength
FROM input
Il risultato è:
arrayLength |
---|
3 |
Convertire gli elementi di matrice in eventi distinti
Selezionare tutti gli elementi della matrice come singoli eventi. L'operatore APPLY, usato insieme alla funzione integrata GetArrayElements, estrae tutti gli elementi della matrice come singoli eventi:
SELECT
DeviceId,
CustomSensor03Record.ArrayIndex,
CustomSensor03Record.ArrayValue
FROM input
CROSS APPLY GetArrayElements(SensorReadings.CustomSensor03) AS CustomSensor03Record
Il risultato è:
DeviceId | ArrayIndex | ArrayValue |
---|---|---|
12345 | 0 | 12 |
12345 | 1 | -5 |
12345 | 2 | 0 |
SELECT
i.DeviceId,
SensorMetadataRecords.ArrayValue.smKey as smKey,
SensorMetadataRecords.ArrayValue.smValue as smValue
FROM input i
CROSS APPLY GetArrayElements(SensorMetadata) AS SensorMetadataRecords
Il risultato è:
DeviceId | smKey | smValue |
---|---|---|
12345 | Produttore | ABC |
12345 | Versione | 1.2.45 |
Se i campi estratti devono essere visualizzati nelle colonne, è possibile pivotare il set di dati usando la sintassi WITH oltre all'operazione JOIN . Tale join richiede una condizione limite temporale che impedisce la duplicazione:
WITH DynamicCTE AS (
SELECT
i.DeviceId,
SensorMetadataRecords.ArrayValue.smKey as smKey,
SensorMetadataRecords.ArrayValue.smValue as smValue
FROM input i
CROSS APPLY GetArrayElements(SensorMetadata) AS SensorMetadataRecords
)
SELECT
i.DeviceId,
i.Location.*,
V.smValue AS 'smVersion',
M.smValue AS 'smManufacturer'
FROM input i
LEFT JOIN DynamicCTE V ON V.smKey = 'Version' and V.DeviceId = i.DeviceId AND DATEDIFF(minute,i,V) BETWEEN 0 AND 0
LEFT JOIN DynamicCTE M ON M.smKey = 'Manufacturer' and M.DeviceId = i.DeviceId AND DATEDIFF(minute,i,M) BETWEEN 0 AND 0
Il risultato è:
DeviceId | Lat | Lungo | smVersion | smManufacturer |
---|---|---|---|---|
12345 | 47 | 122 | 1.2.45 | ABC |
Contenuto correlato
Data Types in Azure Stream Analytics (Tipi di dati in Analisi di flusso di Azure)