Condividi tramite


Analizzare i tipi di dati complessi in Azure Synapse Analytics

Questo articolo è pertinente per file e contenitori Parquet in Collegamento ad Azure Synapse per Azure Cosmos DB. È possibile usare Spark o SQL per leggere o trasformare i dati con schemi complessi, ad esempio matrici o strutture annidate. L'esempio seguente viene completato con un singolo documento, ma può essere facilmente ridimensionato a miliardi di documenti con Spark o SQL. Il codice incluso in questo articolo usa PySpark (Python).

Caso d'uso

I tipi di dati complessi sono sempre più comuni e rappresentano una sfida per i data engineer. L'analisi di schemi annidati e matrici può comportare query SQL complesse e dispendiose in termini di tempo. Inoltre, può essere difficile rinominare o eseguire il cast del tipo di dati delle colonne annidate. Inoltre, quando si lavora con oggetti annidati in modo approfondito, è possibile riscontrare problemi di prestazioni.

I data engineer devono comprendere come elaborare in modo efficiente i tipi di dati complessi e renderli facilmente accessibili a tutti. Nell'esempio seguente si usa Spark in Azure Synapse Analytics per leggere e trasformare gli oggetti in una struttura piatta tramite frame di dati. Usare il modello serverless di SQL in Azure Synapse Analytics per eseguire direttamente query su tali oggetti e restituire tali risultati come tabella normale.

Che cosa sono le matrici e le strutture annidate?

L'oggetto seguente proviene da Application Insights. In questo oggetto sono presenti strutture annidate e matrici che contengono strutture annidate.

{
    "id": "66532691-ab20-11ea-8b1d-936b3ec64e54",
    "context": {
        "data": {
            "eventTime": "2020-06-10T13:43:34.553Z",
            "samplingRate": "100.0",
            "isSynthetic": "false"
        },
        "session": {
            "isFirst": "false",
            "id": "38619c14-7a23-4687-8268-95862c5326b1"
        },
        "custom": {
            "dimensions": [
                {
                    "customerInfo": {
                        "ProfileType": "ExpertUser",
                        "RoomName": "",
                        "CustomerName": "diamond",
                        "UserName": "XXXX@yahoo.com"
                    }
                },
                {
                    "customerInfo": {
                        "ProfileType": "Novice",
                        "RoomName": "",
                        "CustomerName": "topaz",
                        "UserName": "XXXX@outlook.com"
                    }
                }
            ]
        }
    }
}

Esempio di schema di matrici e strutture annidate

Quando si stampa lo schema del frame di dati dell'oggetto (denominato df) con il comando df.printschema, viene visualizzata la rappresentazione seguente:

  • Il giallo rappresenta strutture annidate.
  • Il verde rappresenta una matrice con due elementi.

Codice con evidenziazione gialla e verde, che mostra l'origine dello schema

_rid, _tse _etag sono stati aggiunti al sistema durante l'inserimento del documento nell'archivio transazionale di Azure Cosmos DB.

Il frame di dati precedente conta solo 5 colonne e 1 riga. Dopo la trasformazione, il frame di dati curato avrà 13 colonne e 2 righe, in un formato tabulare.

Appiattire le strutture annidate ed espandere matrici

Con Spark in Azure Synapse Analytics, è facile trasformare le strutture annidate in colonne ed elementi di matrice in più righe. Seguire questa procedura per l'implementazione.

Diagramma di flusso che mostra i passaggi per le trasformazioni Spark

Definire una funzione per rendere flat lo schema annidato

È possibile usare questa funzione senza modifiche. Creare una cella in un notebook PySpark con la funzione seguente:

from pyspark.sql.functions import col

def flatten_df(nested_df):
    stack = [((), nested_df)]
    columns = []

    while len(stack) > 0:
        parents, df = stack.pop()

        flat_cols = [
            col(".".join(parents + (c[0],))).alias("_".join(parents + (c[0],)))
            for c in df.dtypes
            if c[1][:6] != "struct"
        ]

        nested_cols = [
            c[0]
            for c in df.dtypes
            if c[1][:6] == "struct"
        ]

        columns.extend(flat_cols)

        for nested_col in nested_cols:
            projected_df = df.select(nested_col + ".*")
            stack.append((parents + (nested_col,), projected_df))

    return nested_df.select(columns)

Usare la funzione per rendere flat lo schema annidato

In questo passaggio si appiattisce lo schema annidato del frame di dati (df) in un nuovo frame di dati (df_flat):

from pyspark.sql.types import StringType, StructField, StructType
df_flat = flatten_df(df)
display(df_flat.limit(10))

La funzione di visualizzazione deve restituire 10 colonne e 1 riga. La matrice e i relativi elementi annidati sono ancora presenti.

Trasformare la matrice

In questo caso, si trasforma la matrice, context_custom_dimensions, nel frame di dati df_flat, in un nuovo frame di dati df_flat_explode. Nel codice seguente si definisce anche la colonna da selezionare:

from pyspark.sql.functions import explode
from pyspark.sql.functions import flatten
from pyspark.sql.functions import arrays_zip
df_flat_explode = df_flat.select("_rid","_ts","id","_etag",explode(df_flat.context_custom_dimensions),"context_session_isFirst","context_session_id","context_data_eventTime","context_data_samplingRate","context_data_isSynthetic")\
.select("_rid","_ts","id","_etag","col.*","context_session_isFirst","context_session_id","context_data_eventTime","context_data_samplingRate","context_data_isSynthetic")
display(df_flat_explode.limit(10))

La funzione di visualizzazione deve restituire 10 colonne e 2 righe. Il passaggio successivo consiste nell'appiattire gli schemi annidati con la funzione definita nel passaggio 1.

Usare la funzione per rendere flat lo schema annidato

Infine, si usa la funzione per rendere flat lo schema annidato del frame di dati df_flat_explode, in un nuovo frame di dati, df_flat_explode_flat:

df_flat_explode_flat = flatten_df(df_flat_explode)
display(df_flat_explode_flat.limit(10))

La funzione di visualizzazione dovrebbe mostrare 13 colonne e 2 righe.

La funzione printSchema del frame di dati df_flat_explode_flat restituisce il risultato seguente:

Codice che mostra lo schema finale

Leggere direttamente matrici e strutture annidate

Con il modello serverless di SQL, è possibile eseguire query e creare viste e tabelle su tali oggetti.

In primo luogo, a seconda della modalità di archiviazione dei dati, gli utenti devono usare la tassonomia seguente. Tutti gli elementi visualizzati in lettere maiuscole sono specifici del caso d'uso:

In blocco Formato
'https://ACCOUNTNAME.dfs.core.windows.net/FILESYSTEM/PATH/FINENAME.parquet' 'Parquet' (ADLSg2)
N'endpoint=https://ACCOUNTNAME.documents-staging.windows-ppe.net:443/;account=ACCOUNTNAME;database=DATABASENAME;collection=COLLECTIONNAME;region=REGIONTOQUERY', SECRET='YOURSECRET' 'CosmosDB' (collegamento ad Azure Synapse)

Sostituire ogni campo come segue:

  • 'YOUR BULK ABOVE' è la stringa di connessione dell'origine dati a cui ci si connette.
  • 'YOUR TYPE ABOVE' è il formato usato per connettersi all'origine.
select *
FROM
openrowset(
    BULK 'YOUR BULK ABOVE',
    FORMAT='YOUR TYPE ABOVE'
)
with (id varchar(50),
        contextdataeventTime varchar(50) '$.context.data.eventTime',
        contextdatasamplingRate varchar(50) '$.context.data.samplingRate',
        contextdataisSynthetic varchar(50) '$.context.data.isSynthetic',
        contextsessionisFirst varchar(50) '$.context.session.isFirst',
        contextsessionid varchar(50) '$.context.session.id',
        contextcustomdimensions varchar(max) '$.context.custom.dimensions'
) as q 
cross apply openjson (contextcustomdimensions) 
with ( ProfileType varchar(50) '$.customerInfo.ProfileType',
            RoomName varchar(50) '$.customerInfo.RoomName',
            CustomerName varchar(50) '$.customerInfo.CustomerName',
            UserName varchar(50) '$.customerInfo.UserName'
    )

Esistono due tipi diversi di operazioni:

  • Il primo tipo di operazione è indicato nella riga di codice seguente, che definisce la colonna denominata contextdataeventTime che fa riferimento all'elemento annidato, Context.Data.eventTime.

    contextdataeventTime varchar(50) '$.context.data.eventTime'
    

    Questa riga definisce la colonna denominata contextdataeventTime che fa riferimento all'elemento annidato, Context>Data>eventTime.

  • Il secondo tipo di operazione usa cross apply per creare nuove righe per ogni elemento nella matrice. Definisce quindi ogni oggetto annidato.

    cross apply openjson (contextcustomdimensions) 
    with ( ProfileType varchar(50) '$.customerInfo.ProfileType', 
    

    Se la matrice ha 5 elementi con 4 strutture annidate, il modello serverless di SQL restituisce 5 righe e 4 colonne. Il modello serverless di SQL può eseguire query sul posto, mappare la matrice in 2 righe e visualizzare tutte le strutture annidate in colonne.

Passaggi successivi