Analizowanie złożonych typów danych w usłudze Azure Synapse Analytics
Ten artykuł dotyczy plików i kontenerów Parquet w usłudze Azure Synapse Link dla usługi Azure Cosmos DB. Za pomocą platformy Spark lub SQL można odczytywać lub przekształcać dane za pomocą złożonych schematów, takich jak tablice lub struktury zagnieżdżone. Poniższy przykład został ukończony przy użyciu pojedynczego dokumentu, ale można go łatwo skalować do miliardów dokumentów przy użyciu platformy Spark lub języka SQL. Kod zawarty w tym artykule używa narzędzia PySpark (Python).
Przypadek użycia
Złożone typy danych są coraz bardziej powszechne i stanowią wyzwanie dla inżynierów danych. Analizowanie zagnieżdżonego schematu i tablic może obejmować czasochłonne i złożone zapytania SQL. Ponadto zmiana nazwy lub rzutowanie zagnieżdżonych kolumn typu danych może być trudne. Ponadto podczas pracy z głęboko zagnieżdżonych obiektów można napotkać problemy z wydajnością.
Inżynierowie danych muszą zrozumieć, jak efektywnie przetwarzać złożone typy danych i ułatwić im dostęp do wszystkich użytkowników. W poniższym przykładzie używasz platformy Spark w usłudze Azure Synapse Analytics do odczytywania i przekształcania obiektów w płaską strukturę za pomocą ramek danych. Model sql bezserwerowy w usłudze Azure Synapse Analytics umożliwia bezpośrednie wykonywanie zapytań o takie obiekty i zwracanie tych wyników jako zwykłą tabelę.
Co to są tablice i struktury zagnieżdżone?
Poniższy obiekt pochodzi z usługi Application Insights. W tym obiekcie istnieją zagnieżdżone struktury i tablice zawierające struktury zagnieżdżone.
{
"id": "66532691-ab20-11ea-8b1d-936b3ec64e54",
"context": {
"data": {
"eventTime": "2020-06-10T13:43:34.553Z",
"samplingRate": "100.0",
"isSynthetic": "false"
},
"session": {
"isFirst": "false",
"id": "38619c14-7a23-4687-8268-95862c5326b1"
},
"custom": {
"dimensions": [
{
"customerInfo": {
"ProfileType": "ExpertUser",
"RoomName": "",
"CustomerName": "diamond",
"UserName": "XXXX@yahoo.com"
}
},
{
"customerInfo": {
"ProfileType": "Novice",
"RoomName": "",
"CustomerName": "topaz",
"UserName": "XXXX@outlook.com"
}
}
]
}
}
}
Przykład schematu tablic i zagnieżdżonych struktur
Podczas drukowania schematu ramki danych obiektu (o nazwie df) za pomocą polecenia df.printschema
zobaczysz następującą reprezentację:
- Żółty reprezentuje zagnieżdżone struktury.
- Zielony reprezentuje tablicę z dwoma elementami.
_rid
, _ts
i _etag
zostały dodane do systemu, ponieważ dokument został pozyskany do magazynu transakcyjnego usługi Azure Cosmos DB.
Poprzednia ramka danych liczy się tylko dla 5 kolumn i 1 wierszy. Po przekształceniu wyselekcjonowane ramki danych będą miały 13 kolumn i 2 wiersze w formacie tabelarycznym.
Spłaszczają zagnieżdżone struktury i eksplodują tablice
Platforma Spark w usłudze Azure Synapse Analytics umożliwia łatwe przekształcanie zagnieżdżonych struktur w kolumny i elementy tablicy w wiele wierszy. Aby wykonać implementację, wykonaj następujące kroki.
Definiowanie funkcji w celu spłaszczenia zagnieżdżonego schematu
Tej funkcji można używać bez zmiany. Utwórz komórkę w notesie PySpark za pomocą następującej funkcji:
from pyspark.sql.functions import col
def flatten_df(nested_df):
stack = [((), nested_df)]
columns = []
while len(stack) > 0:
parents, df = stack.pop()
flat_cols = [
col(".".join(parents + (c[0],))).alias("_".join(parents + (c[0],)))
for c in df.dtypes
if c[1][:6] != "struct"
]
nested_cols = [
c[0]
for c in df.dtypes
if c[1][:6] == "struct"
]
columns.extend(flat_cols)
for nested_col in nested_cols:
projected_df = df.select(nested_col + ".*")
stack.append((parents + (nested_col,), projected_df))
return nested_df.select(columns)
Używanie funkcji do spłaszczenia zagnieżdżonego schematu
W tym kroku spłaszczone schematy zagnieżdżonej ramki danych (df) do nowej ramki danych (df_flat
):
from pyspark.sql.types import StringType, StructField, StructType
df_flat = flatten_df(df)
display(df_flat.limit(10))
Funkcja wyświetlania powinna zwrócić 10 kolumn i 1 wiersz. Tablica i jej zagnieżdżone elementy są nadal dostępne.
Przekształcanie tablicy
W tym miejscu przekształć tablicę , context_custom_dimensions
w ramce df_flat
danych , w nową ramkę df_flat_explode
danych . W poniższym kodzie zdefiniujesz również kolumnę do wybrania:
from pyspark.sql.functions import explode
from pyspark.sql.functions import flatten
from pyspark.sql.functions import arrays_zip
df_flat_explode = df_flat.select("_rid","_ts","id","_etag",explode(df_flat.context_custom_dimensions),"context_session_isFirst","context_session_id","context_data_eventTime","context_data_samplingRate","context_data_isSynthetic")\
.select("_rid","_ts","id","_etag","col.*","context_session_isFirst","context_session_id","context_data_eventTime","context_data_samplingRate","context_data_isSynthetic")
display(df_flat_explode.limit(10))
Funkcja wyświetlania powinna zwracać 10 kolumn i 2 wiersze. Następnym krokiem jest spłaszczenie zagnieżdżonych schematów z funkcją zdefiniowaną w kroku 1.
Używanie funkcji do spłaszczenia zagnieżdżonego schematu
Na koniec funkcja służy do spłaszczenia zagnieżdżonego schematu ramki df_flat_explode
danych w nowej ramce danych: df_flat_explode_flat
df_flat_explode_flat = flatten_df(df_flat_explode)
display(df_flat_explode_flat.limit(10))
Funkcja wyświetlania powinna zawierać 13 kolumn i 2 wiersze.
Funkcja printSchema
ramki df_flat_explode_flat
danych zwraca następujący wynik:
Odczytywanie tablic i zagnieżdżonych struktur bezpośrednio
Model bezserwerowy języka SQL umożliwia wykonywanie zapytań i tworzenie widoków i tabel dla takich obiektów.
Najpierw, w zależności od sposobu przechowywania danych, użytkownicy powinni używać następującej taksonomii. Wszystkie elementy wyświetlane w wielkiej literze są specyficzne dla twojego przypadku użycia:
Zbiorczo | Formatuj |
---|---|
"https://ACCOUNTNAME.dfs.core.windows.net/FILESYSTEM/PATH/FINENAME.parquet" | "Parquet" (ADLSg2) |
N'endpoint=https://ACCOUNTNAME.documents-staging.windows-ppe.net:443/; account=ACCOUNTNAME; database=DATABASENAME; collection=COLLECTIONNAME; region=REGIONTOQUERY', SECRET='YOURSECRET' | "CosmosDB" (Azure Synapse Link) |
Zastąp każde pole w następujący sposób:
- "DANE ZBIORCZE POWYŻEJ" to parametry połączenia źródła danych, z którym nawiązujesz połączenie.
- "TWÓJ TYP POWYŻEJ" jest formatem używanym do nawiązywania połączenia ze źródłem.
select *
FROM
openrowset(
BULK 'YOUR BULK ABOVE',
FORMAT='YOUR TYPE ABOVE'
)
with (id varchar(50),
contextdataeventTime varchar(50) '$.context.data.eventTime',
contextdatasamplingRate varchar(50) '$.context.data.samplingRate',
contextdataisSynthetic varchar(50) '$.context.data.isSynthetic',
contextsessionisFirst varchar(50) '$.context.session.isFirst',
contextsessionid varchar(50) '$.context.session.id',
contextcustomdimensions varchar(max) '$.context.custom.dimensions'
) as q
cross apply openjson (contextcustomdimensions)
with ( ProfileType varchar(50) '$.customerInfo.ProfileType',
RoomName varchar(50) '$.customerInfo.RoomName',
CustomerName varchar(50) '$.customerInfo.CustomerName',
UserName varchar(50) '$.customerInfo.UserName'
)
Istnieją dwa różne typy operacji:
Pierwszy typ operacji jest wskazywany w poniższym wierszu kodu, który definiuje kolumnę o nazwie
contextdataeventTime
, która odwołuje się do zagnieżdżonego elementu ,Context.Data.eventTime
.contextdataeventTime varchar(50) '$.context.data.eventTime'
Ten wiersz definiuje kolumnę o nazwie
contextdataeventTime
, która odwołuje się do zagnieżdżonego elementu ,Context>Data>eventTime
.Drugi typ operacji używa
cross apply
polecenia do tworzenia nowych wierszy dla każdego elementu w tablicy. Następnie definiuje każdy zagnieżdżony obiekt.cross apply openjson (contextcustomdimensions) with ( ProfileType varchar(50) '$.customerInfo.ProfileType',
Jeśli tablica zawiera 5 elementów z 4 zagnieżdżonych struktur, model bezserwerowy sql zwraca 5 wierszy i 4 kolumn. Model bezserwerowy sql może wykonywać zapytania, mapować tablicę w 2 wierszach i wyświetlać wszystkie zagnieżdżone struktury w kolumnach.