Praca z danymi w ramce danych platformy Spark
Natywnie platforma Spark używa struktury danych nazywanej odpornym rozproszonym zestawem danych (RDD), ale chociaż można napisać kod, który działa bezpośrednio z RDD, najczęściej używaną strukturą danych do pracy z danymi ustrukturyzowanymi na platformie Spark jest ramka danych, która jest udostępniana jako część biblioteki Spark SQL. Ramki danych na platformie Spark są podobne do tych w wszechobecnej bibliotece języka Python biblioteki Pandas , ale zoptymalizowane pod kątem pracy w środowisku przetwarzania rozproszonego platformy Spark.
Uwaga
Oprócz interfejsu API ramki danych platforma Spark SQL udostępnia silnie typizowane interfejs API zestawu danych , który jest obsługiwany w językach Java i Scala. Skupimy się na interfejsie API ramki danych w tym module.
Ładowanie danych do ramki danych
Przyjrzyjmy się hipotetycznym przykładom, aby zobaczyć, jak można użyć ramki danych do pracy z danymi. Załóżmy, że masz następujące dane w pliku tekstowym rozdzielonym przecinkami o nazwie products.csv w folderze Pliki/dane w usłudze Lakehouse:
ProductID,ProductName,Category,ListPrice
771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...
Wnioskowanie schematu
W notesie platformy Spark możesz użyć następującego kodu PySpark, aby załadować dane pliku do ramki danych i wyświetlić pierwsze 10 wierszy:
%%pyspark
df = spark.read.load('Files/data/products.csv',
format='csv',
header=True
)
display(df.limit(10))
Wiersz %%pyspark
na początku jest nazywany magią i informuje platformę Spark, że język używany w tej komórce to PySpark. Możesz wybrać język, którego chcesz użyć jako domyślnego na pasku narzędzi interfejsu notesu, a następnie użyć magii, aby zastąpić ten wybór dla określonej komórki. Na przykład oto odpowiedni kod Scala dla danych produktów:
%%spark
val df = spark.read.format("csv").option("header", "true").load("Files/data/products.csv")
display(df.limit(10))
Magia %%spark
służy do określania Scala.
Oba te przykłady kodu spowodują wygenerowanie danych wyjściowych w następujący sposób:
ProductID | ProductName | Kategoria | ListPrice |
---|---|---|---|
771 | Górskie — 100 srebrnych, 38 | Rowery górskie | 3399.9900 |
772 | Górskie — 100 srebrnych, 42 | Rowery górskie | 3399.9900 |
773 | Górskie — 100 srebrnych, 44 | Rowery górskie | 3399.9900 |
... | ... | ... | ... |
Określanie jawnego schematu
W poprzednim przykładzie pierwszy wiersz pliku CSV zawierał nazwy kolumn, a platforma Spark mogła wywnioskować typ danych każdej kolumny z danych, które zawiera. Można również określić jawny schemat dla danych, co jest przydatne, gdy nazwy kolumn nie są uwzględnione w pliku danych, podobnie jak w tym przykładzie CSV:
771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...
W poniższym przykładzie PySpark pokazano, jak określić schemat ramki danych do załadowania z pliku o nazwie product-data.csv w tym formacie:
from pyspark.sql.types import *
from pyspark.sql.functions import *
productSchema = StructType([
StructField("ProductID", IntegerType()),
StructField("ProductName", StringType()),
StructField("Category", StringType()),
StructField("ListPrice", FloatType())
])
df = spark.read.load('Files/data/product-data.csv',
format='csv',
schema=productSchema,
header=False)
display(df.limit(10))
Wyniki będą ponownie podobne do następujących:
ProductID | ProductName | Kategoria | ListPrice |
---|---|---|---|
771 | Górskie — 100 srebrnych, 38 | Rowery górskie | 3399.9900 |
772 | Górskie — 100 srebrnych, 42 | Rowery górskie | 3399.9900 |
773 | Górskie — 100 srebrnych, 44 | Rowery górskie | 3399.9900 |
... | ... | ... | ... |
Napiwek
Określanie jawnego schematu zwiększa również wydajność!
Filtrowanie i grupowanie ramek danych
Metody klasy Ramka danych umożliwiają filtrowanie, sortowanie, grupowanie i manipulowanie danymi, które zawiera. Na przykład poniższy przykład kodu używa metody select w celu pobrania kolumn ProductID i ListPrice z ramki danych df zawierającej dane produktu w poprzednim przykładzie:
pricelist_df = df.select("ProductID", "ListPrice")
Wyniki z tego przykładu kodu będą wyglądać mniej więcej tak:
ProductID | ListPrice |
---|---|
771 | 3399.9900 |
772 | 3399.9900 |
773 | 3399.9900 |
... | ... |
W przypadku większości metod manipulowania danymi wybierz polecenie zwraca nowy obiekt ramki danych.
Napiwek
Wybranie podzbioru kolumn z ramki danych jest wspólną operacją, którą można również osiągnąć przy użyciu następującej krótszej składni:
pricelist_df = df["ProductID", "ListPrice"]
Metody "łańcuchowe" można łączyć w celu wykonania serii manipulacji, które skutkują przekształconą ramką danych. Na przykład ten przykładowy kod tworzy łańcuch wyboru i miejsca tworzenia nowej ramki danych zawierającej kolumny ProductName i ListPrice dla produktów z kategorią Mountain Bikes lub Road Bikes:
bikes_df = df.select("ProductName", "Category", "ListPrice").where((df["Category"]=="Mountain Bikes") | (df["Category"]=="Road Bikes"))
display(bikes_df)
Wyniki z tego przykładu kodu będą wyglądać mniej więcej tak:
ProductName | Kategoria | ListPrice |
---|---|---|
Górskie — 100 srebrnych, 38 | Rowery górskie | 3399.9900 |
Road-750 Black, 52 | Rowery szosowe | 539.9900 |
... | ... | ... |
Aby grupować i agregować dane, możesz użyć metody groupBy i funkcji agregacji. Na przykład następujący kod PySpark zlicza liczbę produktów dla każdej kategorii:
counts_df = df.select("ProductID", "Category").groupBy("Category").count()
display(counts_df)
Wyniki z tego przykładu kodu będą wyglądać mniej więcej tak:
Kategoria | count |
---|---|
Zestawy słuchawkowe | 3 |
Wheels | 14 |
Rowery górskie | 32 |
... | ... |
Zapisywanie ramki danych
Często chcesz używać platformy Spark do przekształcania danych pierwotnych i zapisywania wyników w celu dalszej analizy lub przetwarzania podrzędnego. Poniższy przykład kodu zapisuje ramkę danych w pliku parquet w usłudze Data Lake, zastępując dowolny istniejący plik o tej samej nazwie.
bikes_df.write.mode("overwrite").parquet('Files/product_data/bikes.parquet')
Uwaga
Format Parquet jest zazwyczaj preferowany w przypadku plików danych, które będą używane do dalszej analizy lub pozyskiwania do magazynu analitycznego. Parquet to bardzo wydajny format obsługiwany przez większość systemów analizy danych na dużą skalę. W rzeczywistości czasami wymaganie dotyczące przekształcania danych może być po prostu konwertowanie danych z innego formatu (takiego jak CSV) na Parquet!
Partycjonowanie pliku wyjściowego
Partycjonowanie to technika optymalizacji, która umożliwia platformie Spark maksymalizację wydajności w węzłach roboczych. Zwiększenie wydajności można osiągnąć podczas filtrowania danych w zapytaniach, eliminując niepotrzebne we/wy dysku.
Aby zapisać ramkę danych jako partycjonowany zestaw plików, użyj metody partitionBy podczas zapisywania danych. Poniższy przykład umożliwia zapisanie ramki danych bikes_df (zawierającej dane produktów dla kategorii rowerów górskich i rowerów drogowych) oraz partycjonowanie danych według kategorii:
bikes_df.write.partitionBy("Category").mode("overwrite").parquet("Files/bike_data")
Nazwy folderów generowane podczas partycjonowania ramki danych obejmują nazwę i wartość kolumny partycjonowania w formacie column=value, więc przykładowy kod tworzy folder o nazwie bike_data zawierający następujące podfoldery:
- Category=Mountain Bikes
- Category=Road Bikes
Każdy podfolder zawiera co najmniej jeden plik parquet z danymi produktu dla odpowiedniej kategorii.
Uwaga
Dane można podzielić na partycje według wielu kolumn, co powoduje hierarchię folderów dla każdego klucza partycjonowania. Można na przykład podzielić dane zamówienia sprzedaży według roku i miesiąca, aby hierarchia folderów zawierała folder dla każdej wartości roku, który z kolei zawiera podfolder dla każdej wartości miesiąca.
Ładowanie danych partycjonowanych
Podczas odczytywania partycjonowanych danych w ramce danych można załadować dane z dowolnego folderu w hierarchii, określając jawne wartości lub symbole wieloznaczne dla pól partycjonowanych. Poniższy przykład ładuje dane dla produktów w kategorii Road Bikes :
road_bikes_df = spark.read.parquet('Files/bike_data/Category=Road Bikes')
display(road_bikes_df.limit(5))
Uwaga
Kolumny partycjonowania określone w ścieżce pliku zostaną pominięte w wynikowej ramce danych. Wyniki wygenerowane przez przykładowe zapytanie nie zawierają kolumny Category — kategoria dla wszystkich wierszy to Road Bikes.