Analýza složitých datových typů ve službě Azure Synapse Analytics
Tento článek je relevantní pro soubory a kontejnery Parquet ve službě Azure Synapse Link pro Azure Cosmos DB. Spark nebo SQL můžete použít ke čtení nebo transformaci dat pomocí složitých schémat, jako jsou pole nebo vnořené struktury. Následující příklad je dokončen s jedním dokumentem, ale může se snadno škálovat na miliardy dokumentů pomocí Sparku nebo SQL. Kód, který je součástí tohoto článku, používá PySpark (Python).
Případ použití
Složité datové typy jsou stále častější a představují výzvu pro datové inženýry. Analýza vnořených schémat a polí může zahrnovat časově náročné a složité dotazy SQL. Kromě toho může být obtížné přejmenovávat nebo přetypovat datový typ vnořených sloupců. Při práci s hluboko vnořenými objekty můžete také narazit na problémy s výkonem.
Datoví inženýři potřebují pochopit, jak efektivně zpracovávat složité datové typy a jak je snadno zpřístupnit všem. V následujícím příkladu použijete Spark ve službě Azure Synapse Analytics ke čtení a transformaci objektů do ploché struktury prostřednictvím datových rámců. K přímému dotazování těchto objektů použijete bezserverový model SQL v Azure Synapse Analytics a tyto výsledky vrátíte jako běžnou tabulku.
Co jsou pole a vnořené struktury?
Následující objekt pochází z Application Insights. V tomto objektu jsou vnořené struktury a pole, které obsahují vnořené struktury.
{
"id": "66532691-ab20-11ea-8b1d-936b3ec64e54",
"context": {
"data": {
"eventTime": "2020-06-10T13:43:34.553Z",
"samplingRate": "100.0",
"isSynthetic": "false"
},
"session": {
"isFirst": "false",
"id": "38619c14-7a23-4687-8268-95862c5326b1"
},
"custom": {
"dimensions": [
{
"customerInfo": {
"ProfileType": "ExpertUser",
"RoomName": "",
"CustomerName": "diamond",
"UserName": "XXXX@yahoo.com"
}
},
{
"customerInfo": {
"ProfileType": "Novice",
"RoomName": "",
"CustomerName": "topaz",
"UserName": "XXXX@outlook.com"
}
}
]
}
}
}
Příklad schématu polí a vnořených struktur
Při tisku schématu datového rámce objektu (označovaného jako df) pomocí příkazu df.printschema
se zobrazí následující reprezentace:
- Žlutá představuje vnořené struktury.
- Zelená představuje pole se dvěma prvky.
_rid
, _ts
a _etag
byly přidány do systému, protože dokument byl ingestován do transakčního úložiště Azure Cosmos DB.
Předchozí datový rámec počítá pouze pro 5 sloupců a 1 řádek. Po transformaci bude kurátorovaný datový rámec obsahovat 13 sloupců a 2 řádky v tabulkovém formátu.
Zploštěné vnořené struktury a rozložené pole
Spark ve službě Azure Synapse Analytics umožňuje snadno transformovat vnořené struktury do sloupců a prvků pole na několik řádků. K implementaci použijte následující kroky.
Definování funkce pro zploštěné schéma
Tuto funkci můžete použít beze změny. Vytvořte buňku v poznámkovém bloku PySpark s následující funkcí:
from pyspark.sql.functions import col
def flatten_df(nested_df):
stack = [((), nested_df)]
columns = []
while len(stack) > 0:
parents, df = stack.pop()
flat_cols = [
col(".".join(parents + (c[0],))).alias("_".join(parents + (c[0],)))
for c in df.dtypes
if c[1][:6] != "struct"
]
nested_cols = [
c[0]
for c in df.dtypes
if c[1][:6] == "struct"
]
columns.extend(flat_cols)
for nested_col in nested_cols:
projected_df = df.select(nested_col + ".*")
stack.append((parents + (nested_col,), projected_df))
return nested_df.select(columns)
Použití funkce k zploštění vnořeného schématu
V tomto kroku zploštěné schéma datového rámce (df) zploštěte do nového datového rámce (df_flat
):
from pyspark.sql.types import StringType, StructField, StructType
df_flat = flatten_df(df)
display(df_flat.limit(10))
Funkce zobrazení by měla vrátit 10 sloupců a 1 řádek. Pole a jeho vnořené prvky jsou stále tam.
Transformace pole
Zde transformujete pole v context_custom_dimensions
datovém rámci df_flat
na nový datový rámec df_flat_explode
. V následujícím kódu také definujete, který sloupec se má vybrat:
from pyspark.sql.functions import explode
from pyspark.sql.functions import flatten
from pyspark.sql.functions import arrays_zip
df_flat_explode = df_flat.select("_rid","_ts","id","_etag",explode(df_flat.context_custom_dimensions),"context_session_isFirst","context_session_id","context_data_eventTime","context_data_samplingRate","context_data_isSynthetic")\
.select("_rid","_ts","id","_etag","col.*","context_session_isFirst","context_session_id","context_data_eventTime","context_data_samplingRate","context_data_isSynthetic")
display(df_flat_explode.limit(10))
Funkce zobrazení by měla vrátit 10 sloupců a 2 řádky. Dalším krokem je zploštění vnořených schémat pomocí funkce definované v kroku 1.
Použití funkce k zploštění vnořeného schématu
Nakonec funkci použijete k zploštění vnořeného schématu datového rámce df_flat_explode
do nového datového rámce: df_flat_explode_flat
df_flat_explode_flat = flatten_df(df_flat_explode)
display(df_flat_explode_flat.limit(10))
Funkce zobrazení by měla zobrazit 13 sloupců a 2 řádky.
Funkce printSchema
datového rámce df_flat_explode_flat
vrátí následující výsledek:
Čtení polí a vnořených struktur přímo
S bezserverovým modelem SQL můžete dotazovat a vytvářet zobrazení a tabulky nad těmito objekty.
Nejprve v závislosti na způsobu uložení dat by uživatelé měli použít následující taxonomii. Všechno, co se zobrazuje velkými písmeny, je specifické pro váš případ použití:
Hromadné | Formát |
---|---|
'https://ACCOUNTNAME.dfs.core.windows.net/FILESYSTEM/PATH/FINENAME.parquet' | Parquet (ADLSg2) |
N'endpoint=https://ACCOUNTNAME.documents-staging.windows-ppe.net:443/; account=ACCOUNTNAME; database=DATABASENAME; collection=COLLECTIONNAME; region=REGIONTOQUERY', SECRET='YOURSECRET' | CosmosDB (Azure Synapse Link) |
Každé pole nahraďte následujícím způsobem:
- "VAŠE BULK ABOVE" je připojovací řetězec zdroje dat, ke kterému se připojujete.
- "TYP VÝŠE" je formát, který používáte pro připojení ke zdroji.
select *
FROM
openrowset(
BULK 'YOUR BULK ABOVE',
FORMAT='YOUR TYPE ABOVE'
)
with (id varchar(50),
contextdataeventTime varchar(50) '$.context.data.eventTime',
contextdatasamplingRate varchar(50) '$.context.data.samplingRate',
contextdataisSynthetic varchar(50) '$.context.data.isSynthetic',
contextsessionisFirst varchar(50) '$.context.session.isFirst',
contextsessionid varchar(50) '$.context.session.id',
contextcustomdimensions varchar(max) '$.context.custom.dimensions'
) as q
cross apply openjson (contextcustomdimensions)
with ( ProfileType varchar(50) '$.customerInfo.ProfileType',
RoomName varchar(50) '$.customerInfo.RoomName',
CustomerName varchar(50) '$.customerInfo.CustomerName',
UserName varchar(50) '$.customerInfo.UserName'
)
Existují dva různé typy operací:
První typ operace je uveden v následujícím řádku kódu, který definuje sloupec
contextdataeventTime
, který odkazuje na vnořený prvek,Context.Data.eventTime
.contextdataeventTime varchar(50) '$.context.data.eventTime'
Tento řádek definuje sloupec,
contextdataeventTime
který odkazuje na vnořený prvek,Context>Data>eventTime
.Druhý typ operace používá
cross apply
k vytvoření nových řádků pro každý prvek pod polem. Pak definuje každý vnořený objekt.cross apply openjson (contextcustomdimensions) with ( ProfileType varchar(50) '$.customerInfo.ProfileType',
Pokud pole mělo 5 prvků se 4 vnořenými strukturami, bezserverový model SQL vrátí 5 řádků a 4 sloupce. Bezserverový model SQL se může dotazovat na místo, mapovat pole ve 2 řádcích a zobrazit všechny vnořené struktury do sloupců.