Analisar tipos de dados complexos no Azure Synapse Analytics
Este artigo é relevante para arquivos e contêineres do Parquet no Azure Synapse Link for Azure Cosmos DB. Você pode usar o Spark ou o SQL para ler ou transformar dados com esquemas complexos, como matrizes ou estruturas aninhadas. O exemplo a seguir é concluído com um único documento, mas pode ser facilmente dimensionado para bilhões de documentos com o Spark ou SQL. O código incluído neste artigo usa PySpark (Python).
Caso de utilização
Tipos de dados complexos são cada vez mais comuns e representam um desafio para os engenheiros de dados. A análise de esquemas e matrizes aninhados pode envolver consultas SQL demoradas e complexas. Além disso, pode ser difícil renomear ou converter o tipo de dados de colunas aninhadas. Além disso, quando você está trabalhando com objetos profundamente aninhados, você pode encontrar problemas de desempenho.
Os engenheiros de dados precisam entender como processar com eficiência tipos de dados complexos e torná-los facilmente acessíveis a todos. No exemplo a seguir, você usa o Spark no Azure Synapse Analytics para ler e transformar objetos em uma estrutura plana por meio de quadros de dados. Você usa o modelo sem servidor do SQL no Azure Synapse Analytics para consultar esses objetos diretamente e retornar esses resultados como uma tabela regular.
O que são arrays e estruturas aninhadas?
O objeto a seguir vem do Application Insights. Neste objeto, há estruturas aninhadas e matrizes que contêm estruturas aninhadas.
{
"id": "66532691-ab20-11ea-8b1d-936b3ec64e54",
"context": {
"data": {
"eventTime": "2020-06-10T13:43:34.553Z",
"samplingRate": "100.0",
"isSynthetic": "false"
},
"session": {
"isFirst": "false",
"id": "38619c14-7a23-4687-8268-95862c5326b1"
},
"custom": {
"dimensions": [
{
"customerInfo": {
"ProfileType": "ExpertUser",
"RoomName": "",
"CustomerName": "diamond",
"UserName": "XXXX@yahoo.com"
}
},
{
"customerInfo": {
"ProfileType": "Novice",
"RoomName": "",
"CustomerName": "topaz",
"UserName": "XXXX@outlook.com"
}
}
]
}
}
}
Exemplo de esquema de matrizes e estruturas aninhadas
Ao imprimir o esquema do quadro de dados do objeto (chamado df) com o comando df.printschema
, você verá a seguinte representação:
- O amarelo representa estruturas aninhadas.
- Verde representa uma matriz com dois elementos.
_rid
, _ts
e _etag
foram adicionados ao sistema à medida que o documento foi ingerido no repositório transacional do Azure Cosmos DB.
O quadro de dados anterior conta apenas para 5 colunas e 1 linha. Após a transformação, o quadro de dados curado terá 13 colunas e 2 linhas, em formato tabular.
Achatar estruturas aninhadas e explodir matrizes
Com o Spark no Azure Synapse Analytics, é fácil transformar estruturas aninhadas em colunas e elementos de matriz em várias linhas. Use as etapas a seguir para implementação.
Definir uma função para nivelar o esquema aninhado
Você pode usar esta função sem alterações. Crie uma célula em um bloco de anotações PySpark com a seguinte função:
from pyspark.sql.functions import col
def flatten_df(nested_df):
stack = [((), nested_df)]
columns = []
while len(stack) > 0:
parents, df = stack.pop()
flat_cols = [
col(".".join(parents + (c[0],))).alias("_".join(parents + (c[0],)))
for c in df.dtypes
if c[1][:6] != "struct"
]
nested_cols = [
c[0]
for c in df.dtypes
if c[1][:6] == "struct"
]
columns.extend(flat_cols)
for nested_col in nested_cols:
projected_df = df.select(nested_col + ".*")
stack.append((parents + (nested_col,), projected_df))
return nested_df.select(columns)
Use a função para nivelar o esquema aninhado
Nesta etapa, você nivela o esquema aninhado do quadro de dados (df) em um novo quadro de dados (df_flat
):
from pyspark.sql.types import StringType, StructField, StructType
df_flat = flatten_df(df)
display(df_flat.limit(10))
A função de exibição deve retornar 10 colunas e 1 linha. A matriz e seus elementos aninhados ainda estão lá.
Transformar a matriz
Aqui, você transforma a matriz, context_custom_dimensions
no quadro df_flat
de dados, em um novo quadro de df_flat_explode
dados. No código a seguir, você também define qual coluna selecionar:
from pyspark.sql.functions import explode
from pyspark.sql.functions import flatten
from pyspark.sql.functions import arrays_zip
df_flat_explode = df_flat.select("_rid","_ts","id","_etag",explode(df_flat.context_custom_dimensions),"context_session_isFirst","context_session_id","context_data_eventTime","context_data_samplingRate","context_data_isSynthetic")\
.select("_rid","_ts","id","_etag","col.*","context_session_isFirst","context_session_id","context_data_eventTime","context_data_samplingRate","context_data_isSynthetic")
display(df_flat_explode.limit(10))
A função de exibição deve retornar 10 colunas e 2 linhas. A próxima etapa é nivelar esquemas aninhados com a função definida na etapa 1.
Use a função para nivelar o esquema aninhado
Finalmente, você usa a função para nivelar o esquema aninhado do quadro df_flat_explode
de dados , em um novo quadro de dados, df_flat_explode_flat
:
df_flat_explode_flat = flatten_df(df_flat_explode)
display(df_flat_explode_flat.limit(10))
A função de exibição deve mostrar 13 colunas e 2 linhas.
A função printSchema
do quadro df_flat_explode_flat
de dados retorna o seguinte resultado:
Leia matrizes e estruturas aninhadas diretamente
Com o modelo sem servidor do SQL, você pode consultar e criar exibições e tabelas sobre esses objetos.
Primeiro, dependendo de como os dados foram armazenados, os usuários devem usar a seguinte taxonomia. Tudo o que é mostrado em maiúsculas é específico para o seu caso de uso:
Em massa | Formato |
---|---|
'https://ACCOUNTNAME.dfs.core.windows.net/FILESYSTEM/PATH/FINENAME.parquet' | 'Parquet' (ADLSg2) |
N'endpoint=https://ACCOUNTNAME.documents-staging.windows-ppe.net:443/; account=ACCOUNTNAME; banco de dados = DATABASENAME; collection=COLLECTIONNAME; region=REGIONTOQUERY', SECRET='SEUSEGREDO' | 'CosmosDB' (Azure Synapse Link) |
Substitua cada campo da seguinte forma:
- 'YOUR BULK ABOVE' é a cadeia de conexão da fonte de dados à qual você se conecta.
- 'SEU TIPO ACIMA' é o formato que você usa para se conectar à fonte.
select *
FROM
openrowset(
BULK 'YOUR BULK ABOVE',
FORMAT='YOUR TYPE ABOVE'
)
with (id varchar(50),
contextdataeventTime varchar(50) '$.context.data.eventTime',
contextdatasamplingRate varchar(50) '$.context.data.samplingRate',
contextdataisSynthetic varchar(50) '$.context.data.isSynthetic',
contextsessionisFirst varchar(50) '$.context.session.isFirst',
contextsessionid varchar(50) '$.context.session.id',
contextcustomdimensions varchar(max) '$.context.custom.dimensions'
) as q
cross apply openjson (contextcustomdimensions)
with ( ProfileType varchar(50) '$.customerInfo.ProfileType',
RoomName varchar(50) '$.customerInfo.RoomName',
CustomerName varchar(50) '$.customerInfo.CustomerName',
UserName varchar(50) '$.customerInfo.UserName'
)
Existem dois tipos diferentes de operações:
O primeiro tipo de operação é indicado na seguinte linha de código, que define a coluna chamada
contextdataeventTime
que se refere ao elemento aninhado,Context.Data.eventTime
.contextdataeventTime varchar(50) '$.context.data.eventTime'
Esta linha define a coluna chamada
contextdataeventTime
que se refere ao elemento aninhado,Context>Data>eventTime
.O segundo tipo de operação usa
cross apply
para criar novas linhas para cada elemento sob a matriz. Em seguida, ele define cada objeto aninhado.cross apply openjson (contextcustomdimensions) with ( ProfileType varchar(50) '$.customerInfo.ProfileType',
Se a matriz tinha 5 elementos com 4 estruturas aninhadas, o modelo sem servidor do SQL retorna 5 linhas e 4 colunas. O modelo sem servidor do SQL pode consultar no local, mapear a matriz em 2 linhas e exibir todas as estruturas aninhadas em colunas.