Sdílet prostřednictvím


Analýza složitých datových typů ve službě Azure Synapse Analytics

Tento článek je relevantní pro soubory a kontejnery Parquet ve službě Azure Synapse Link pro Azure Cosmos DB. Spark nebo SQL můžete použít ke čtení nebo transformaci dat pomocí složitých schémat, jako jsou pole nebo vnořené struktury. Následující příklad je dokončen s jedním dokumentem, ale může se snadno škálovat na miliardy dokumentů pomocí Sparku nebo SQL. Kód, který je součástí tohoto článku, používá PySpark (Python).

Případ použití

Složité datové typy jsou stále častější a představují výzvu pro datové inženýry. Analýza vnořených schémat a polí může zahrnovat časově náročné a složité dotazy SQL. Kromě toho může být obtížné přejmenovávat nebo přetypovat datový typ vnořených sloupců. Při práci s hluboko vnořenými objekty můžete také narazit na problémy s výkonem.

Datoví inženýři potřebují pochopit, jak efektivně zpracovávat složité datové typy a jak je snadno zpřístupnit všem. V následujícím příkladu použijete Spark ve službě Azure Synapse Analytics ke čtení a transformaci objektů do ploché struktury prostřednictvím datových rámců. K přímému dotazování těchto objektů použijete bezserverový model SQL v Azure Synapse Analytics a tyto výsledky vrátíte jako běžnou tabulku.

Co jsou pole a vnořené struktury?

Následující objekt pochází z Application Insights. V tomto objektu jsou vnořené struktury a pole, které obsahují vnořené struktury.

{
    "id": "66532691-ab20-11ea-8b1d-936b3ec64e54",
    "context": {
        "data": {
            "eventTime": "2020-06-10T13:43:34.553Z",
            "samplingRate": "100.0",
            "isSynthetic": "false"
        },
        "session": {
            "isFirst": "false",
            "id": "38619c14-7a23-4687-8268-95862c5326b1"
        },
        "custom": {
            "dimensions": [
                {
                    "customerInfo": {
                        "ProfileType": "ExpertUser",
                        "RoomName": "",
                        "CustomerName": "diamond",
                        "UserName": "XXXX@yahoo.com"
                    }
                },
                {
                    "customerInfo": {
                        "ProfileType": "Novice",
                        "RoomName": "",
                        "CustomerName": "topaz",
                        "UserName": "XXXX@outlook.com"
                    }
                }
            ]
        }
    }
}

Příklad schématu polí a vnořených struktur

Při tisku schématu datového rámce objektu (označovaného jako df) pomocí příkazu df.printschemase zobrazí následující reprezentace:

  • Žlutá představuje vnořené struktury.
  • Zelená představuje pole se dvěma prvky.

Kód se žlutým a zeleným zvýrazněním a zobrazením původu schématu

_rid, _tsa _etag byly přidány do systému, protože dokument byl ingestován do transakčního úložiště Azure Cosmos DB.

Předchozí datový rámec počítá pouze pro 5 sloupců a 1 řádek. Po transformaci bude kurátorovaný datový rámec obsahovat 13 sloupců a 2 řádky v tabulkovém formátu.

Zploštěné vnořené struktury a rozložené pole

Spark ve službě Azure Synapse Analytics umožňuje snadno transformovat vnořené struktury do sloupců a prvků pole na několik řádků. K implementaci použijte následující kroky.

Vývojový diagram znázorňující kroky pro transformace Sparku

Definování funkce pro zploštěné schéma

Tuto funkci můžete použít beze změny. Vytvořte buňku v poznámkovém bloku PySpark s následující funkcí:

from pyspark.sql.functions import col

def flatten_df(nested_df):
    stack = [((), nested_df)]
    columns = []

    while len(stack) > 0:
        parents, df = stack.pop()

        flat_cols = [
            col(".".join(parents + (c[0],))).alias("_".join(parents + (c[0],)))
            for c in df.dtypes
            if c[1][:6] != "struct"
        ]

        nested_cols = [
            c[0]
            for c in df.dtypes
            if c[1][:6] == "struct"
        ]

        columns.extend(flat_cols)

        for nested_col in nested_cols:
            projected_df = df.select(nested_col + ".*")
            stack.append((parents + (nested_col,), projected_df))

    return nested_df.select(columns)

Použití funkce k zploštění vnořeného schématu

V tomto kroku zploštěné schéma datového rámce (df) zploštěte do nového datového rámce (df_flat):

from pyspark.sql.types import StringType, StructField, StructType
df_flat = flatten_df(df)
display(df_flat.limit(10))

Funkce zobrazení by měla vrátit 10 sloupců a 1 řádek. Pole a jeho vnořené prvky jsou stále tam.

Transformace pole

Zde transformujete pole v context_custom_dimensionsdatovém rámci df_flatna nový datový rámec df_flat_explode. V následujícím kódu také definujete, který sloupec se má vybrat:

from pyspark.sql.functions import explode
from pyspark.sql.functions import flatten
from pyspark.sql.functions import arrays_zip
df_flat_explode = df_flat.select("_rid","_ts","id","_etag",explode(df_flat.context_custom_dimensions),"context_session_isFirst","context_session_id","context_data_eventTime","context_data_samplingRate","context_data_isSynthetic")\
.select("_rid","_ts","id","_etag","col.*","context_session_isFirst","context_session_id","context_data_eventTime","context_data_samplingRate","context_data_isSynthetic")
display(df_flat_explode.limit(10))

Funkce zobrazení by měla vrátit 10 sloupců a 2 řádky. Dalším krokem je zploštění vnořených schémat pomocí funkce definované v kroku 1.

Použití funkce k zploštění vnořeného schématu

Nakonec funkci použijete k zploštění vnořeného schématu datového rámce df_flat_explodedo nového datového rámce: df_flat_explode_flat

df_flat_explode_flat = flatten_df(df_flat_explode)
display(df_flat_explode_flat.limit(10))

Funkce zobrazení by měla zobrazit 13 sloupců a 2 řádky.

Funkce printSchema datového rámce df_flat_explode_flat vrátí následující výsledek:

Kód znázorňující konečné schéma

Čtení polí a vnořených struktur přímo

S bezserverovým modelem SQL můžete dotazovat a vytvářet zobrazení a tabulky nad těmito objekty.

Nejprve v závislosti na způsobu uložení dat by uživatelé měli použít následující taxonomii. Všechno, co se zobrazuje velkými písmeny, je specifické pro váš případ použití:

Hromadné Formát
'https://ACCOUNTNAME.dfs.core.windows.net/FILESYSTEM/PATH/FINENAME.parquet' Parquet (ADLSg2)
N'endpoint=https://ACCOUNTNAME.documents-staging.windows-ppe.net:443/; account=ACCOUNTNAME; database=DATABASENAME; collection=COLLECTIONNAME; region=REGIONTOQUERY', SECRET='YOURSECRET' CosmosDB (Azure Synapse Link)

Každé pole nahraďte následujícím způsobem:

  • "VAŠE BULK ABOVE" je připojovací řetězec zdroje dat, ke kterému se připojujete.
  • "TYP VÝŠE" je formát, který používáte pro připojení ke zdroji.
select *
FROM
openrowset(
    BULK 'YOUR BULK ABOVE',
    FORMAT='YOUR TYPE ABOVE'
)
with (id varchar(50),
        contextdataeventTime varchar(50) '$.context.data.eventTime',
        contextdatasamplingRate varchar(50) '$.context.data.samplingRate',
        contextdataisSynthetic varchar(50) '$.context.data.isSynthetic',
        contextsessionisFirst varchar(50) '$.context.session.isFirst',
        contextsessionid varchar(50) '$.context.session.id',
        contextcustomdimensions varchar(max) '$.context.custom.dimensions'
) as q 
cross apply openjson (contextcustomdimensions) 
with ( ProfileType varchar(50) '$.customerInfo.ProfileType',
            RoomName varchar(50) '$.customerInfo.RoomName',
            CustomerName varchar(50) '$.customerInfo.CustomerName',
            UserName varchar(50) '$.customerInfo.UserName'
    )

Existují dva různé typy operací:

  • První typ operace je uveden v následujícím řádku kódu, který definuje sloupec contextdataeventTime , který odkazuje na vnořený prvek, Context.Data.eventTime.

    contextdataeventTime varchar(50) '$.context.data.eventTime'
    

    Tento řádek definuje sloupec, contextdataeventTime který odkazuje na vnořený prvek, Context>Data>eventTime.

  • Druhý typ operace používá cross apply k vytvoření nových řádků pro každý prvek pod polem. Pak definuje každý vnořený objekt.

    cross apply openjson (contextcustomdimensions) 
    with ( ProfileType varchar(50) '$.customerInfo.ProfileType', 
    

    Pokud pole mělo 5 prvků se 4 vnořenými strukturami, bezserverový model SQL vrátí 5 řádků a 4 sloupce. Bezserverový model SQL se může dotazovat na místo, mapovat pole ve 2 řádcích a zobrazit všechny vnořené struktury do sloupců.

Další kroky