Analizzare i tipi di dati complessi in Azure Synapse Analytics
Questo articolo è pertinente per file e contenitori Parquet in Collegamento ad Azure Synapse per Azure Cosmos DB. È possibile usare Spark o SQL per leggere o trasformare i dati con schemi complessi, ad esempio matrici o strutture annidate. L'esempio seguente viene completato con un singolo documento, ma può essere facilmente ridimensionato a miliardi di documenti con Spark o SQL. Il codice incluso in questo articolo usa PySpark (Python).
Caso d'uso
I tipi di dati complessi sono sempre più comuni e rappresentano una sfida per i data engineer. L'analisi di schemi annidati e matrici può comportare query SQL complesse e dispendiose in termini di tempo. Inoltre, può essere difficile rinominare o eseguire il cast del tipo di dati delle colonne annidate. Inoltre, quando si lavora con oggetti annidati in modo approfondito, è possibile riscontrare problemi di prestazioni.
I data engineer devono comprendere come elaborare in modo efficiente i tipi di dati complessi e renderli facilmente accessibili a tutti. Nell'esempio seguente si usa Spark in Azure Synapse Analytics per leggere e trasformare gli oggetti in una struttura piatta tramite frame di dati. Usare il modello serverless di SQL in Azure Synapse Analytics per eseguire direttamente query su tali oggetti e restituire tali risultati come tabella normale.
Che cosa sono le matrici e le strutture annidate?
L'oggetto seguente proviene da Application Insights. In questo oggetto sono presenti strutture annidate e matrici che contengono strutture annidate.
{
"id": "66532691-ab20-11ea-8b1d-936b3ec64e54",
"context": {
"data": {
"eventTime": "2020-06-10T13:43:34.553Z",
"samplingRate": "100.0",
"isSynthetic": "false"
},
"session": {
"isFirst": "false",
"id": "38619c14-7a23-4687-8268-95862c5326b1"
},
"custom": {
"dimensions": [
{
"customerInfo": {
"ProfileType": "ExpertUser",
"RoomName": "",
"CustomerName": "diamond",
"UserName": "XXXX@yahoo.com"
}
},
{
"customerInfo": {
"ProfileType": "Novice",
"RoomName": "",
"CustomerName": "topaz",
"UserName": "XXXX@outlook.com"
}
}
]
}
}
}
Esempio di schema di matrici e strutture annidate
Quando si stampa lo schema del frame di dati dell'oggetto (denominato df) con il comando df.printschema
, viene visualizzata la rappresentazione seguente:
- Il giallo rappresenta strutture annidate.
- Il verde rappresenta una matrice con due elementi.
_rid
, _ts
e _etag
sono stati aggiunti al sistema durante l'inserimento del documento nell'archivio transazionale di Azure Cosmos DB.
Il frame di dati precedente conta solo 5 colonne e 1 riga. Dopo la trasformazione, il frame di dati curato avrà 13 colonne e 2 righe, in un formato tabulare.
Appiattire le strutture annidate ed espandere matrici
Con Spark in Azure Synapse Analytics, è facile trasformare le strutture annidate in colonne ed elementi di matrice in più righe. Seguire questa procedura per l'implementazione.
Definire una funzione per rendere flat lo schema annidato
È possibile usare questa funzione senza modifiche. Creare una cella in un notebook PySpark con la funzione seguente:
from pyspark.sql.functions import col
def flatten_df(nested_df):
stack = [((), nested_df)]
columns = []
while len(stack) > 0:
parents, df = stack.pop()
flat_cols = [
col(".".join(parents + (c[0],))).alias("_".join(parents + (c[0],)))
for c in df.dtypes
if c[1][:6] != "struct"
]
nested_cols = [
c[0]
for c in df.dtypes
if c[1][:6] == "struct"
]
columns.extend(flat_cols)
for nested_col in nested_cols:
projected_df = df.select(nested_col + ".*")
stack.append((parents + (nested_col,), projected_df))
return nested_df.select(columns)
Usare la funzione per rendere flat lo schema annidato
In questo passaggio si appiattisce lo schema annidato del frame di dati (df) in un nuovo frame di dati (df_flat
):
from pyspark.sql.types import StringType, StructField, StructType
df_flat = flatten_df(df)
display(df_flat.limit(10))
La funzione di visualizzazione deve restituire 10 colonne e 1 riga. La matrice e i relativi elementi annidati sono ancora presenti.
Trasformare la matrice
In questo caso, si trasforma la matrice, context_custom_dimensions
, nel frame di dati df_flat
, in un nuovo frame di dati df_flat_explode
. Nel codice seguente si definisce anche la colonna da selezionare:
from pyspark.sql.functions import explode
from pyspark.sql.functions import flatten
from pyspark.sql.functions import arrays_zip
df_flat_explode = df_flat.select("_rid","_ts","id","_etag",explode(df_flat.context_custom_dimensions),"context_session_isFirst","context_session_id","context_data_eventTime","context_data_samplingRate","context_data_isSynthetic")\
.select("_rid","_ts","id","_etag","col.*","context_session_isFirst","context_session_id","context_data_eventTime","context_data_samplingRate","context_data_isSynthetic")
display(df_flat_explode.limit(10))
La funzione di visualizzazione deve restituire 10 colonne e 2 righe. Il passaggio successivo consiste nell'appiattire gli schemi annidati con la funzione definita nel passaggio 1.
Usare la funzione per rendere flat lo schema annidato
Infine, si usa la funzione per rendere flat lo schema annidato del frame di dati df_flat_explode
, in un nuovo frame di dati, df_flat_explode_flat
:
df_flat_explode_flat = flatten_df(df_flat_explode)
display(df_flat_explode_flat.limit(10))
La funzione di visualizzazione dovrebbe mostrare 13 colonne e 2 righe.
La funzione printSchema
del frame di dati df_flat_explode_flat
restituisce il risultato seguente:
Leggere direttamente matrici e strutture annidate
Con il modello serverless di SQL, è possibile eseguire query e creare viste e tabelle su tali oggetti.
In primo luogo, a seconda della modalità di archiviazione dei dati, gli utenti devono usare la tassonomia seguente. Tutti gli elementi visualizzati in lettere maiuscole sono specifici del caso d'uso:
In blocco | Formato |
---|---|
'https://ACCOUNTNAME.dfs.core.windows.net/FILESYSTEM/PATH/FINENAME.parquet' | 'Parquet' (ADLSg2) |
N'endpoint=https://ACCOUNTNAME.documents-staging.windows-ppe.net:443/;account=ACCOUNTNAME;database=DATABASENAME;collection=COLLECTIONNAME;region=REGIONTOQUERY', SECRET='YOURSECRET' | 'CosmosDB' (collegamento ad Azure Synapse) |
Sostituire ogni campo come segue:
- 'YOUR BULK ABOVE' è la stringa di connessione dell'origine dati a cui ci si connette.
- 'YOUR TYPE ABOVE' è il formato usato per connettersi all'origine.
select *
FROM
openrowset(
BULK 'YOUR BULK ABOVE',
FORMAT='YOUR TYPE ABOVE'
)
with (id varchar(50),
contextdataeventTime varchar(50) '$.context.data.eventTime',
contextdatasamplingRate varchar(50) '$.context.data.samplingRate',
contextdataisSynthetic varchar(50) '$.context.data.isSynthetic',
contextsessionisFirst varchar(50) '$.context.session.isFirst',
contextsessionid varchar(50) '$.context.session.id',
contextcustomdimensions varchar(max) '$.context.custom.dimensions'
) as q
cross apply openjson (contextcustomdimensions)
with ( ProfileType varchar(50) '$.customerInfo.ProfileType',
RoomName varchar(50) '$.customerInfo.RoomName',
CustomerName varchar(50) '$.customerInfo.CustomerName',
UserName varchar(50) '$.customerInfo.UserName'
)
Esistono due tipi diversi di operazioni:
Il primo tipo di operazione è indicato nella riga di codice seguente, che definisce la colonna denominata
contextdataeventTime
che fa riferimento all'elemento annidato,Context.Data.eventTime
.contextdataeventTime varchar(50) '$.context.data.eventTime'
Questa riga definisce la colonna denominata
contextdataeventTime
che fa riferimento all'elemento annidato,Context>Data>eventTime
.Il secondo tipo di operazione usa
cross apply
per creare nuove righe per ogni elemento nella matrice. Definisce quindi ogni oggetto annidato.cross apply openjson (contextcustomdimensions) with ( ProfileType varchar(50) '$.customerInfo.ProfileType',
Se la matrice ha 5 elementi con 4 strutture annidate, il modello serverless di SQL restituisce 5 righe e 4 colonne. Il modello serverless di SQL può eseguire query sul posto, mappare la matrice in 2 righe e visualizzare tutte le strutture annidate in colonne.