Udostępnij za pośrednictwem


Analizowanie złożonych typów danych w usłudze Azure Synapse Analytics

Ten artykuł dotyczy plików i kontenerów Parquet w usłudze Azure Synapse Link dla usługi Azure Cosmos DB. Za pomocą platformy Spark lub SQL można odczytywać lub przekształcać dane za pomocą złożonych schematów, takich jak tablice lub struktury zagnieżdżone. Poniższy przykład został ukończony przy użyciu pojedynczego dokumentu, ale można go łatwo skalować do miliardów dokumentów przy użyciu platformy Spark lub języka SQL. Kod zawarty w tym artykule używa narzędzia PySpark (Python).

Przypadek użycia

Złożone typy danych są coraz bardziej powszechne i stanowią wyzwanie dla inżynierów danych. Analizowanie zagnieżdżonego schematu i tablic może obejmować czasochłonne i złożone zapytania SQL. Ponadto zmiana nazwy lub rzutowanie zagnieżdżonych kolumn typu danych może być trudne. Ponadto podczas pracy z głęboko zagnieżdżonych obiektów można napotkać problemy z wydajnością.

Inżynierowie danych muszą zrozumieć, jak efektywnie przetwarzać złożone typy danych i ułatwić im dostęp do wszystkich użytkowników. W poniższym przykładzie używasz platformy Spark w usłudze Azure Synapse Analytics do odczytywania i przekształcania obiektów w płaską strukturę za pomocą ramek danych. Model sql bezserwerowy w usłudze Azure Synapse Analytics umożliwia bezpośrednie wykonywanie zapytań o takie obiekty i zwracanie tych wyników jako zwykłą tabelę.

Co to są tablice i struktury zagnieżdżone?

Poniższy obiekt pochodzi z usługi Application Insights. W tym obiekcie istnieją zagnieżdżone struktury i tablice zawierające struktury zagnieżdżone.

{
    "id": "66532691-ab20-11ea-8b1d-936b3ec64e54",
    "context": {
        "data": {
            "eventTime": "2020-06-10T13:43:34.553Z",
            "samplingRate": "100.0",
            "isSynthetic": "false"
        },
        "session": {
            "isFirst": "false",
            "id": "38619c14-7a23-4687-8268-95862c5326b1"
        },
        "custom": {
            "dimensions": [
                {
                    "customerInfo": {
                        "ProfileType": "ExpertUser",
                        "RoomName": "",
                        "CustomerName": "diamond",
                        "UserName": "XXXX@yahoo.com"
                    }
                },
                {
                    "customerInfo": {
                        "ProfileType": "Novice",
                        "RoomName": "",
                        "CustomerName": "topaz",
                        "UserName": "XXXX@outlook.com"
                    }
                }
            ]
        }
    }
}

Przykład schematu tablic i zagnieżdżonych struktur

Podczas drukowania schematu ramki danych obiektu (o nazwie df) za pomocą polecenia df.printschemazobaczysz następującą reprezentację:

  • Żółty reprezentuje zagnieżdżone struktury.
  • Zielony reprezentuje tablicę z dwoma elementami.

Kod z żółtym i zielonym wyróżnianiem przedstawiającym źródło schematu

_rid, _tsi _etag zostały dodane do systemu, ponieważ dokument został pozyskany do magazynu transakcyjnego usługi Azure Cosmos DB.

Poprzednia ramka danych liczy się tylko dla 5 kolumn i 1 wierszy. Po przekształceniu wyselekcjonowane ramki danych będą miały 13 kolumn i 2 wiersze w formacie tabelarycznym.

Spłaszczają zagnieżdżone struktury i eksplodują tablice

Platforma Spark w usłudze Azure Synapse Analytics umożliwia łatwe przekształcanie zagnieżdżonych struktur w kolumny i elementy tablicy w wiele wierszy. Aby wykonać implementację, wykonaj następujące kroki.

Schemat blokowy przedstawiający kroki przekształceń platformy Spark

Definiowanie funkcji w celu spłaszczenia zagnieżdżonego schematu

Tej funkcji można używać bez zmiany. Utwórz komórkę w notesie PySpark za pomocą następującej funkcji:

from pyspark.sql.functions import col

def flatten_df(nested_df):
    stack = [((), nested_df)]
    columns = []

    while len(stack) > 0:
        parents, df = stack.pop()

        flat_cols = [
            col(".".join(parents + (c[0],))).alias("_".join(parents + (c[0],)))
            for c in df.dtypes
            if c[1][:6] != "struct"
        ]

        nested_cols = [
            c[0]
            for c in df.dtypes
            if c[1][:6] == "struct"
        ]

        columns.extend(flat_cols)

        for nested_col in nested_cols:
            projected_df = df.select(nested_col + ".*")
            stack.append((parents + (nested_col,), projected_df))

    return nested_df.select(columns)

Używanie funkcji do spłaszczenia zagnieżdżonego schematu

W tym kroku spłaszczone schematy zagnieżdżonej ramki danych (df) do nowej ramki danych (df_flat):

from pyspark.sql.types import StringType, StructField, StructType
df_flat = flatten_df(df)
display(df_flat.limit(10))

Funkcja wyświetlania powinna zwrócić 10 kolumn i 1 wiersz. Tablica i jej zagnieżdżone elementy są nadal dostępne.

Przekształcanie tablicy

W tym miejscu przekształć tablicę , context_custom_dimensionsw ramce df_flatdanych , w nową ramkę df_flat_explodedanych . W poniższym kodzie zdefiniujesz również kolumnę do wybrania:

from pyspark.sql.functions import explode
from pyspark.sql.functions import flatten
from pyspark.sql.functions import arrays_zip
df_flat_explode = df_flat.select("_rid","_ts","id","_etag",explode(df_flat.context_custom_dimensions),"context_session_isFirst","context_session_id","context_data_eventTime","context_data_samplingRate","context_data_isSynthetic")\
.select("_rid","_ts","id","_etag","col.*","context_session_isFirst","context_session_id","context_data_eventTime","context_data_samplingRate","context_data_isSynthetic")
display(df_flat_explode.limit(10))

Funkcja wyświetlania powinna zwracać 10 kolumn i 2 wiersze. Następnym krokiem jest spłaszczenie zagnieżdżonych schematów z funkcją zdefiniowaną w kroku 1.

Używanie funkcji do spłaszczenia zagnieżdżonego schematu

Na koniec funkcja służy do spłaszczenia zagnieżdżonego schematu ramki df_flat_explodedanych w nowej ramce danych: df_flat_explode_flat

df_flat_explode_flat = flatten_df(df_flat_explode)
display(df_flat_explode_flat.limit(10))

Funkcja wyświetlania powinna zawierać 13 kolumn i 2 wiersze.

Funkcja printSchema ramki df_flat_explode_flat danych zwraca następujący wynik:

Kod przedstawiający ostateczny schemat

Odczytywanie tablic i zagnieżdżonych struktur bezpośrednio

Model bezserwerowy języka SQL umożliwia wykonywanie zapytań i tworzenie widoków i tabel dla takich obiektów.

Najpierw, w zależności od sposobu przechowywania danych, użytkownicy powinni używać następującej taksonomii. Wszystkie elementy wyświetlane w wielkiej literze są specyficzne dla twojego przypadku użycia:

Zbiorczo Formatuj
"https://ACCOUNTNAME.dfs.core.windows.net/FILESYSTEM/PATH/FINENAME.parquet" "Parquet" (ADLSg2)
N'endpoint=https://ACCOUNTNAME.documents-staging.windows-ppe.net:443/; account=ACCOUNTNAME; database=DATABASENAME; collection=COLLECTIONNAME; region=REGIONTOQUERY', SECRET='YOURSECRET' "CosmosDB" (Azure Synapse Link)

Zastąp każde pole w następujący sposób:

  • "DANE ZBIORCZE POWYŻEJ" to parametry połączenia źródła danych, z którym nawiązujesz połączenie.
  • "TWÓJ TYP POWYŻEJ" jest formatem używanym do nawiązywania połączenia ze źródłem.
select *
FROM
openrowset(
    BULK 'YOUR BULK ABOVE',
    FORMAT='YOUR TYPE ABOVE'
)
with (id varchar(50),
        contextdataeventTime varchar(50) '$.context.data.eventTime',
        contextdatasamplingRate varchar(50) '$.context.data.samplingRate',
        contextdataisSynthetic varchar(50) '$.context.data.isSynthetic',
        contextsessionisFirst varchar(50) '$.context.session.isFirst',
        contextsessionid varchar(50) '$.context.session.id',
        contextcustomdimensions varchar(max) '$.context.custom.dimensions'
) as q 
cross apply openjson (contextcustomdimensions) 
with ( ProfileType varchar(50) '$.customerInfo.ProfileType',
            RoomName varchar(50) '$.customerInfo.RoomName',
            CustomerName varchar(50) '$.customerInfo.CustomerName',
            UserName varchar(50) '$.customerInfo.UserName'
    )

Istnieją dwa różne typy operacji:

  • Pierwszy typ operacji jest wskazywany w poniższym wierszu kodu, który definiuje kolumnę o nazwie contextdataeventTime , która odwołuje się do zagnieżdżonego elementu , Context.Data.eventTime.

    contextdataeventTime varchar(50) '$.context.data.eventTime'
    

    Ten wiersz definiuje kolumnę o nazwie contextdataeventTime , która odwołuje się do zagnieżdżonego elementu , Context>Data>eventTime.

  • Drugi typ operacji używa cross apply polecenia do tworzenia nowych wierszy dla każdego elementu w tablicy. Następnie definiuje każdy zagnieżdżony obiekt.

    cross apply openjson (contextcustomdimensions) 
    with ( ProfileType varchar(50) '$.customerInfo.ProfileType', 
    

    Jeśli tablica zawiera 5 elementów z 4 zagnieżdżonych struktur, model bezserwerowy sql zwraca 5 wierszy i 4 kolumn. Model bezserwerowy sql może wykonywać zapytania, mapować tablicę w 2 wierszach i wyświetlać wszystkie zagnieżdżone struktury w kolumnach.

Następne kroki