Analizowanie danych za pomocą platformy Spark
Jedną z zalet korzystania z platformy Spark jest możliwość pisania i uruchamiania kodu w różnych językach programowania, dzięki czemu możesz używać już posiadanych umiejętności programistycznych i używać najbardziej odpowiedniego języka dla danego zadania. Domyślnym językiem w nowym notesie platformy Spark usługi Azure Synapse Analytics jest PySpark — zoptymalizowana pod kątem platformy Spark wersja języka Python, która jest często używana przez analityków i analityków danych ze względu na silną obsługę manipulowania danymi i wizualizacji. Ponadto można używać języków, takich jak Scala (język pochodny języka Java, który może być używany interaktywnie) i SQL (wariant powszechnie używanego języka SQL zawartego w bibliotece Spark SQL do pracy ze strukturami danych relacyjnych). Inżynierowie oprogramowania mogą również tworzyć skompilowane rozwiązania uruchamiane na platformie Spark przy użyciu platform takich jak Java i Microsoft .NET.
Eksplorowanie danych za pomocą ramek danych
Natywnie platforma Spark używa struktury danych nazywanej odpornym rozproszonym zestawem danych (RDD), ale chociaż można napisać kod, który działa bezpośrednio z RDD, najczęściej używaną strukturą danych do pracy z danymi ustrukturyzowanymi na platformie Spark jest ramka danych, która jest udostępniana jako część biblioteki Spark SQL. Ramki danych na platformie Spark są podobne do tych w wszechobecnej bibliotece języka Python biblioteki Pandas , ale zoptymalizowane pod kątem pracy w środowisku przetwarzania rozproszonego platformy Spark.
Uwaga
Oprócz interfejsu API ramki danych platforma Spark SQL udostępnia silnie typizowane interfejs API zestawu danych , który jest obsługiwany w językach Java i Scala. Skupimy się na interfejsie API ramki danych w tym module.
Ładowanie danych do ramki danych
Przyjrzyjmy się hipotetycznym przykładom, aby zobaczyć, jak można użyć ramki danych do pracy z danymi. Załóżmy, że masz następujące dane w pliku tekstowym rozdzielonym przecinkami o nazwie products.csv na podstawowym koncie magazynu dla obszaru roboczego usługi Azure Synapse Analytics:
ProductID,ProductName,Category,ListPrice
771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...
W notesie platformy Spark możesz użyć następującego kodu PySpark, aby załadować dane do ramki danych i wyświetlić pierwsze 10 wierszy:
%%pyspark
df = spark.read.load('abfss://container@store.dfs.core.windows.net/products.csv',
format='csv',
header=True
)
display(df.limit(10))
Wiersz %%pyspark
na początku jest nazywany magią i informuje platformę Spark, że język używany w tej komórce to PySpark. Możesz wybrać język, którego chcesz użyć jako domyślnego na pasku narzędzi interfejsu notesu, a następnie użyć magii, aby zastąpić ten wybór dla określonej komórki. Na przykład oto odpowiedni kod Scala dla danych produktów:
%%spark
val df = spark.read.format("csv").option("header", "true").load("abfss://container@store.dfs.core.windows.net/products.csv")
display(df.limit(10))
Magia %%spark
służy do określania Scala.
Oba te przykłady kodu spowodują wygenerowanie danych wyjściowych w następujący sposób:
ProductID | ProductName | Kategoria | ListPrice |
---|---|---|---|
771 | Górskie — 100 srebrnych, 38 | Rowery górskie | 3399.9900 |
772 | Górskie — 100 srebrnych, 42 | Rowery górskie | 3399.9900 |
773 | Górskie — 100 srebrnych, 44 | Rowery górskie | 3399.9900 |
... | ... | ... | ... |
Określanie schematu ramki danych
W poprzednim przykładzie pierwszy wiersz pliku CSV zawierał nazwy kolumn, a platforma Spark mogła wywnioskować typ danych każdej kolumny z danych, które zawiera. Można również określić jawny schemat dla danych, co jest przydatne, gdy nazwy kolumn nie są uwzględnione w pliku danych, podobnie jak w tym przykładzie CSV:
771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...
W poniższym przykładzie PySpark pokazano, jak określić schemat ramki danych do załadowania z pliku o nazwie product-data.csv w tym formacie:
from pyspark.sql.types import *
from pyspark.sql.functions import *
productSchema = StructType([
StructField("ProductID", IntegerType()),
StructField("ProductName", StringType()),
StructField("Category", StringType()),
StructField("ListPrice", FloatType())
])
df = spark.read.load('abfss://container@store.dfs.core.windows.net/product-data.csv',
format='csv',
schema=productSchema,
header=False)
display(df.limit(10))
Wyniki będą ponownie podobne do następujących:
ProductID | ProductName | Kategoria | ListPrice |
---|---|---|---|
771 | Górskie — 100 srebrnych, 38 | Rowery górskie | 3399.9900 |
772 | Górskie — 100 srebrnych, 42 | Rowery górskie | 3399.9900 |
773 | Górskie — 100 srebrnych, 44 | Rowery górskie | 3399.9900 |
... | ... | ... | ... |
Filtrowanie i grupowanie ramek danych
Metody klasy Ramka danych umożliwiają filtrowanie, sortowanie, grupowanie i manipulowanie danymi, które zawiera. Na przykład poniższy przykład kodu używa metody select , aby pobrać kolumny ProductName i ListPrice z ramki danych df zawierającej dane produktu w poprzednim przykładzie:
pricelist_df = df.select("ProductID", "ListPrice")
Wyniki z tego przykładu kodu będą wyglądać mniej więcej tak:
ProductID | ListPrice |
---|---|
771 | 3399.9900 |
772 | 3399.9900 |
773 | 3399.9900 |
... | ... |
W przypadku większości metod manipulowania danymi wybierz polecenie zwraca nowy obiekt ramki danych.
Napiwek
Wybranie podzbioru kolumn z ramki danych jest wspólną operacją, którą można również osiągnąć przy użyciu następującej krótszej składni:
pricelist_df = df["ProductID", "ListPrice"]
Metody "łańcuchowe" można łączyć w celu wykonania serii manipulacji, które skutkują przekształconą ramką danych. Na przykład ten przykładowy kod tworzy łańcuch wyboru i miejsca tworzenia nowej ramki danych zawierającej kolumny ProductName i ListPrice dla produktów z kategorią Mountain Bikes lub Road Bikes:
bikes_df = df.select("ProductName", "ListPrice").where((df["Category"]=="Mountain Bikes") | (df["Category"]=="Road Bikes"))
display(bikes_df)
Wyniki z tego przykładu kodu będą wyglądać mniej więcej tak:
ProductName | ListPrice |
---|---|
Górskie — 100 srebrnych, 38 | 3399.9900 |
Road-750 Black, 52 | 539.9900 |
... | ... |
Aby grupować i agregować dane, możesz użyć metody groupBy i funkcji agregacji. Na przykład następujący kod PySpark zlicza liczbę produktów dla każdej kategorii:
counts_df = df.select("ProductID", "Category").groupBy("Category").count()
display(counts_df)
Wyniki z tego przykładu kodu będą wyglądać mniej więcej tak:
Kategoria | count |
---|---|
Zestawy słuchawkowe | 3 |
Wheels | 14 |
Rowery górskie | 32 |
... | ... |
Używanie wyrażeń SQL na platformie Spark
Interfejs API ramki danych jest częścią biblioteki Spark o nazwie Spark SQL, która umożliwia analitykom danych używanie wyrażeń SQL do wykonywania zapytań o dane i manipulowania nimi.
Tworzenie obiektów bazy danych w wykazie platformy Spark
Wykaz platformy Spark to magazyn metadanych dla obiektów danych relacyjnych, takich jak widoki i tabele. Środowisko uruchomieniowe platformy Spark może używać wykazu do bezproblemowego integrowania kodu napisanego w dowolnym języku obsługiwanym przez platformę Spark z wyrażeniami SQL, które mogą być bardziej naturalne dla niektórych analityków danych lub deweloperów.
Jednym z najprostszych sposobów udostępniania danych w ramce danych na potrzeby wykonywania zapytań w wykazie platformy Spark jest utworzenie widoku tymczasowego, jak pokazano w poniższym przykładzie kodu:
df.createOrReplaceTempView("products")
Widok jest tymczasowy, co oznacza, że jest on automatycznie usuwany na końcu bieżącej sesji. Możesz również utworzyć tabele , które są utrwalane w wykazie, aby zdefiniować bazę danych, do której można wykonywać zapytania przy użyciu usługi Spark SQL.
Uwaga
Nie będziemy szczegółowo eksplorować tabel wykazu platformy Spark w tym module, ale warto poświęcić trochę czasu na wyróżnienie kilku kluczowych kwestii:
- Pustą tabelę można utworzyć przy użyciu
spark.catalog.createTable
metody . Tabele to struktury metadanych, które przechowują swoje dane bazowe w lokalizacji magazynu skojarzonej z wykazem. Usunięcie tabeli powoduje również usunięcie danych bazowych. - Ramkę danych można zapisać jako tabelę przy użyciu jej
saveAsTable
metody. - Tabelę zewnętrzną można utworzyć przy użyciu
spark.catalog.createExternalTable
metody . Tabele zewnętrzne definiują metadane w wykazie, ale pobierają swoje dane bazowe z zewnętrznej lokalizacji magazynu; zazwyczaj folder w usłudze Data Lake. Usunięcie tabeli zewnętrznej nie powoduje usunięcia danych bazowych.
Wykonywanie zapytań dotyczących danych przy użyciu interfejsu API SQL platformy Spark
Interfejs API SQL platformy Spark można używać w kodzie napisanym w dowolnym języku do wykonywania zapytań dotyczących danych w wykazie. Na przykład poniższy kod PySpark używa zapytania SQL do zwracania danych z widoku produktów jako ramki danych.
bikes_df = spark.sql("SELECT ProductID, ProductName, ListPrice \
FROM products \
WHERE Category IN ('Mountain Bikes', 'Road Bikes')")
display(bikes_df)
Wyniki z przykładu kodu będą wyglądać podobnie do poniższej tabeli:
ProductID | ProductName | ListPrice |
---|---|---|
38 | Górskie — 100 srebrnych, 38 | 3399.9900 |
52 | Road-750 Black, 52 | 539.9900 |
... | ... | ... |
Korzystanie z kodu SQL
W poprzednim przykładzie pokazano, jak używać interfejsu API SQL platformy Spark do osadzania wyrażeń SQL w kodzie platformy Spark. W notesie można również użyć %%sql
funkcji magic do uruchomienia kodu SQL, który wysyła zapytania do obiektów w wykazie, w następujący sposób:
%%sql
SELECT Category, COUNT(ProductID) AS ProductCount
FROM products
GROUP BY Category
ORDER BY Category
Przykład kodu SQL zwraca zestaw wyników, który jest automatycznie wyświetlany w notesie jako tabela, podobnie jak poniżej:
Kategoria | ProductCount |
---|---|
Bib-Shorts | 3 |
Stojaki rowerowe | 1 |
Stojaki rowerowe | 1 |
... | ... |