Compartilhar via


Analisar os tipos de dados complexos no Azure Synapse Analytics

Este artigo é relevante para arquivos e contêineres Parquet noAzure Synapse para Azure Cosmos DB. Você pode usar o Spark ou a linguagem SQL para a leitura ou a transformação de dados por meio de esquemas complexos, tal como as matrizes ou estruturas aninhadas. O exemplo seguinte é concluído com um único documento, mas pode ser facilmente dimensionado a bilhões de documentos por meio do Spark ou SQL. O código incluso neste artigo usa PySpark (Python).

Caso de uso

Os tipos de dados complexos estão cada vez mais comuns e representam um desafio para os engenheiros de dados. Analisando o esquema de Modelo de Dados de Entidade aninhado e as matrizes pode implicar em consultas SQL e complexas que exigem tempo. Além disso, pode ser difícil renomear ou converter o tipo de dados das colunas aninhadas. Além disso, quando você estiver trabalhando com os objetos profundamente aninhados, poderá encontrar problemas de desempenho.

Os engenheiros de dados precisam reconhecer o processo de eficiência dos tipos de dados complexos e torná-los facilmente acessíveis a todos. No exemplo a seguir, você usa o Spark no Azure Synapse Analytics para leitura e transformação dos objetos em uma estrutura plana por meio de quadros de dados. Você usa o modulo sem servidor do SQL no Azure Synapse Analytics para consultar esses objetos diretamente e retornar a esse conjunto de resultados como um guia normal.

O que são matrizes e estruturas aninhadas?

O objeto SQL Server a seguir vem de Visual Studio Online Application insights. Nesse objeto, existem estruturas aninhadas e matrizes que contêm estruturas aninhadas.

{
    "id": "66532691-ab20-11ea-8b1d-936b3ec64e54",
    "context": {
        "data": {
            "eventTime": "2020-06-10T13:43:34.553Z",
            "samplingRate": "100.0",
            "isSynthetic": "false"
        },
        "session": {
            "isFirst": "false",
            "id": "38619c14-7a23-4687-8268-95862c5326b1"
        },
        "custom": {
            "dimensions": [
                {
                    "customerInfo": {
                        "ProfileType": "ExpertUser",
                        "RoomName": "",
                        "CustomerName": "diamond",
                        "UserName": "XXXX@yahoo.com"
                    }
                },
                {
                    "customerInfo": {
                        "ProfileType": "Novice",
                        "RoomName": "",
                        "CustomerName": "topaz",
                        "UserName": "XXXX@outlook.com"
                    }
                }
            ]
        }
    }
}

O exemplo de esquema de matrizes e estruturas aninhadas

Quando você estiver imprimindo o esquema do quadro de dados do objeto (chamado de DF) com o comando df.printschema, você presenciará da seguinte representação:

  • O amarelo representa as estruturas aninhadas.
  • O verde representa uma matriz com dois elementos.

O código com destaque de amarelo e verde, mostrando a origem do esquema

_rid,_tse_etagforam acrescentados ao sistema,visto que o documento foi ingerido no armazenamento transacional no Azure Cosmos DB.

O quadro de dados anterior conta simplesmente na quantidade de 5 colunas e 1 linha. Após a transformação, o quadro de dados coletado terá 13 colunas e 2 linhas, em um formato tabelar.

Nivelar as estruturas aninhadas e detalhar as matrizes

Com o Spark no Azure Synapse Analytics, é fácil transformar as estruturas aninhadas em colunas e os elementos de matriz em múltiplas linhas. Utilize as etapas a seguir para a implementação.

Fluxograma mostrando as etapas durante as transformações do Spark

Definir uma função para nivelar o esquema aninhado

Você poderá usar essa função sem alteração. Criar uma célula em um notebook da PySpark com a seguinte função:

from pyspark.sql.functions import col

def flatten_df(nested_df):
    stack = [((), nested_df)]
    columns = []

    while len(stack) > 0:
        parents, df = stack.pop()

        flat_cols = [
            col(".".join(parents + (c[0],))).alias("_".join(parents + (c[0],)))
            for c in df.dtypes
            if c[1][:6] != "struct"
        ]

        nested_cols = [
            c[0]
            for c in df.dtypes
            if c[1][:6] == "struct"
        ]

        columns.extend(flat_cols)

        for nested_col in nested_cols:
            projected_df = df.select(nested_col + ".*")
            stack.append((parents + (nested_col,), projected_df))

    return nested_df.select(columns)

Definir a função para nivelar o esquema aninhado

Nesta etapa, você nivela o esquema aninhado do quadro de dados (DF) em um novo quadro de dados ( df_flat ):

from pyspark.sql.types import StringType, StructField, StructType
df_flat = flatten_df(df)
display(df_flat.limit(10))

A função de linguagem desenvolvida deve tabelar 10 colunas e 1 linha. A matriz e seus elementos aninhados estão lá.

Transformar as matrizes

Aqui, você transforma a matriz,context_custom_dimensions, no quadro de dados df_flatnum novo quadro de dados df_flat_explode. No código seguinte, você também define qual coluna selecionar:

from pyspark.sql.functions import explode
from pyspark.sql.functions import flatten
from pyspark.sql.functions import arrays_zip
df_flat_explode = df_flat.select("_rid","_ts","id","_etag",explode(df_flat.context_custom_dimensions),"context_session_isFirst","context_session_id","context_data_eventTime","context_data_samplingRate","context_data_isSynthetic")\
.select("_rid","_ts","id","_etag","col.*","context_session_isFirst","context_session_id","context_data_eventTime","context_data_samplingRate","context_data_isSynthetic")
display(df_flat_explode.limit(10))

A função de linguagem desenvolvida deve tabelar 10 colunas e 2 linhas. A próxima etapa é nivelar os esquemas aninhados com a função definida na etapa 1.

Definir a função para nivelar o esquema aninhado

Por fim, você utiliza a função para nivelar o quadro de dados do esquema aninhado df_flat_explode em um novo quadro de dados, df_flat_explode_flat :

df_flat_explode_flat = flatten_df(df_flat_explode)
display(df_flat_explode_flat.limit(10))

A função de linguagem exibida deve tabelar 13 colunas e 2 linhas.

A funçãoprintSchemado quadro de dadosdf_flat_explode_flat retorna o seguinte resultado:

O código mostrando o esquema final

A leitura das matrizes e das estruturas aninhadas diretamente

Com o módulo sem servidor do SQL, você pode consultar e criar exibições e tabelas em tais objetos.

Primeiro, dependendo de como os dados foram armazenados, os usuários devem usar a taxonomia seguinte. Tudo o que é mostrado em letras maiúsculas é específico ao caso de uso:

Em massa Formatar
'https://ACCOUNTNAME.dfs.core.windows.net/FILESYSTEM/PATH/FINENAME.parquet' ' Parquet '(ADLSg2)
N'endpoint=https://ACCOUNTNAME.documents-staging.windows-ppe.net:443/;conta=NOMEDACONTA;bancodedados=NOMEDOBANCODEDADOS;coleção=NOMEDACOLEÇÃO;região=REGIÃOPARACONSULTAR', SENHA='SUASENHA' ' CosmosDB ' (Azure Synapse)

Substitua cada campo como a seguir:

  • ' SUA MASSA ACIMA ' é a cadeia de conexão da fonte de dados à qual você se conecta.
  • ' SEU TIPO ACIMA ' é o formato que você usa para se conectar à fonte de dados.
select *
FROM
openrowset(
    BULK 'YOUR BULK ABOVE',
    FORMAT='YOUR TYPE ABOVE'
)
with (id varchar(50),
        contextdataeventTime varchar(50) '$.context.data.eventTime',
        contextdatasamplingRate varchar(50) '$.context.data.samplingRate',
        contextdataisSynthetic varchar(50) '$.context.data.isSynthetic',
        contextsessionisFirst varchar(50) '$.context.session.isFirst',
        contextsessionid varchar(50) '$.context.session.id',
        contextcustomdimensions varchar(max) '$.context.custom.dimensions'
) as q 
cross apply openjson (contextcustomdimensions) 
with ( ProfileType varchar(50) '$.customerInfo.ProfileType',
            RoomName varchar(50) '$.customerInfo.RoomName',
            CustomerName varchar(50) '$.customerInfo.CustomerName',
            UserName varchar(50) '$.customerInfo.UserName'
    )

Há dois tipos diferentes de operações:

  • O primeiro tipo de operação é indicado na linha de código seguinte,que define a coluna chamadacontextdataeventTimeque se refere ao elemento aninhado,Context.Data.eventTime.

    contextdataeventTime varchar(50) '$.context.data.eventTime'
    

    Esta linha define a coluna chamadacontextdataeventTime que se refere ao elemento aninhado,Context>Data>eventTime.

  • O segundo tipo de operação usa cross apply para criar novas linhas para cada elemento sob a matriz. Por conseguinte, ele define cada objeto aninhado.

    cross apply openjson (contextcustomdimensions) 
    with ( ProfileType varchar(50) '$.customerInfo.ProfileType', 
    

    Se a matriz tivesse 5 elementos com 4 estruturas aninhadas, o modelo sem servidor do SQL tabelará 5 linhas e 4 colunas. O modelo do SQL sem servidor pode ser consultado no local, mapear a matriz em 2 linhas e exibir todas as estruturas aninhadas em colunas.

Próximas etapas