Korzystanie z platformy Spark do pracy z plikami danych

Ukończone

Jedną z zalet korzystania z platformy Spark jest możliwość pisania i uruchamiania kodu w różnych językach programowania, dzięki czemu możesz używać już posiadanych umiejętności programistycznych i używać najbardziej odpowiedniego języka dla danego zadania. Domyślnym językiem w nowym notesie platformy Spark usługi Azure Databricks jest PySpark — zoptymalizowana pod kątem platformy Spark wersja języka Python, która jest często używana przez analityków i analityków danych ze względu na silną obsługę manipulowania danymi i wizualizacji. Ponadto można używać języków, takich jak Scala (język pochodny języka Java, który może być używany interaktywnie) i SQL (wariant powszechnie używanego języka SQL zawartego w bibliotece Spark SQL do pracy ze strukturami danych relacyjnych). Inżynierowie oprogramowania mogą również tworzyć skompilowane rozwiązania uruchamiane na platformie Spark przy użyciu platform takich jak Java.

Eksplorowanie danych za pomocą ramek danych

Natywnie platforma Spark używa struktury danych nazywanej odpornym rozproszonym zestawem danych (RDD), ale chociaż można napisać kod, który działa bezpośrednio z RDD, najczęściej używaną strukturą danych do pracy z danymi ustrukturyzowanymi na platformie Spark jest ramka danych, która jest udostępniana jako część biblioteki Spark SQL. Ramki danych na platformie Spark są podobne do tych w wszechobecnej bibliotece języka Python biblioteki Pandas , ale zoptymalizowane pod kątem pracy w środowisku przetwarzania rozproszonego platformy Spark.

Uwaga

Oprócz interfejsu API ramki danych platforma Spark SQL udostępnia silnie typizowane interfejs API zestawu danych , który jest obsługiwany w językach Java i Scala. Skupimy się na interfejsie API ramki danych w tym module.

Ładowanie danych do ramki danych

Przyjrzyjmy się hipotetycznym przykładom, aby zobaczyć, jak można użyć ramki danych do pracy z danymi. Załóżmy, że masz następujące dane w pliku tekstowym rozdzielonym przecinkami o nazwie products.csv w folderze danych w magazynie systemu plików usługi Databricks (DBFS):

ProductID,ProductName,Category,ListPrice
771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...

W notesie platformy Spark możesz użyć następującego kodu PySpark, aby załadować dane do ramki danych i wyświetlić pierwsze 10 wierszy:

%pyspark
df = spark.read.load('/data/products.csv',
    format='csv',
    header=True
)
display(df.limit(10))

Wiersz %pyspark na początku jest nazywany magią i informuje platformę Spark, że język używany w tej komórce to PySpark. Oto odpowiedni kod Scala dla przykładu danych produktów:

%spark
val df = spark.read.format("csv").option("header", "true").load("/data/products.csv")
display(df.limit(10))

Magia %spark służy do określania Scala.

Napiwek

Możesz również wybrać język, którego chcesz użyć dla każdej komórki w interfejsie notesu.

Oba pokazane wcześniej przykłady spowodują wygenerowanie danych wyjściowych w następujący sposób:

ProductID ProductName Kategoria ListPrice
771 Górskie — 100 srebrnych, 38 Rowery górskie 3399.9900
772 Górskie — 100 srebrnych, 42 Rowery górskie 3399.9900
773 Górskie — 100 srebrnych, 44 Rowery górskie 3399.9900
... ... ... ...

Określanie schematu ramki danych

W poprzednim przykładzie pierwszy wiersz pliku CSV zawierał nazwy kolumn, a platforma Spark mogła wywnioskować typ danych każdej kolumny z danych, które zawiera. Można również określić jawny schemat dla danych, co jest przydatne, gdy nazwy kolumn nie są uwzględnione w pliku danych, podobnie jak w tym przykładzie CSV:

771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...

W poniższym przykładzie PySpark pokazano, jak określić schemat ramki danych do załadowania z pliku o nazwie product-data.csv w tym formacie:

from pyspark.sql.types import *
from pyspark.sql.functions import *

productSchema = StructType([
    StructField("ProductID", IntegerType()),
    StructField("ProductName", StringType()),
    StructField("Category", StringType()),
    StructField("ListPrice", FloatType())
    ])

df = spark.read.load('/data/product-data.csv',
    format='csv',
    schema=productSchema,
    header=False)
display(df.limit(10))

Wyniki będą ponownie podobne do następujących:

ProductID ProductName Kategoria ListPrice
771 Górskie — 100 srebrnych, 38 Rowery górskie 3399.9900
772 Górskie — 100 srebrnych, 42 Rowery górskie 3399.9900
773 Górskie — 100 srebrnych, 44 Rowery górskie 3399.9900
... ... ... ...

Filtrowanie i grupowanie ramek danych

Metody klasy Ramka danych umożliwiają filtrowanie, sortowanie, grupowanie i manipulowanie danymi, które zawiera. Na przykład poniższy przykład kodu używa metody select , aby pobrać kolumny ProductName i ListPrice z ramki danych df zawierającej dane produktu w poprzednim przykładzie:

pricelist_df = df.select("ProductID", "ListPrice")

Wyniki z tego przykładu kodu będą wyglądać mniej więcej tak:

ProductID ListPrice
771 3399.9900
772 3399.9900
773 3399.9900
... ...

W przypadku większości metod manipulowania danymi wybierz polecenie zwraca nowy obiekt ramki danych.

Napiwek

Wybranie podzbioru kolumn z ramki danych jest wspólną operacją, którą można również osiągnąć przy użyciu następującej krótszej składni:

pricelist_df = df["ProductID", "ListPrice"]

Metody "łańcuchowe" można łączyć w celu wykonania serii manipulacji, które skutkują przekształconą ramką danych. Na przykład ten przykładowy kod tworzy łańcuch wyboru i miejsca tworzenia nowej ramki danych zawierającej kolumny ProductName i ListPrice dla produktów z kategorią Mountain Bikes lub Road Bikes:

bikes_df = df.select("ProductName", "ListPrice").where((df["Category"]=="Mountain Bikes") | (df["Category"]=="Road Bikes"))
display(bikes_df)

Wyniki z tego przykładu kodu będą wyglądać mniej więcej tak:

ProductName ListPrice
Górskie — 100 srebrnych, 38 3399.9900
Road-750 Black, 52 539.9900
... ...

Aby grupować i agregować dane, możesz użyć metody groupBy i funkcji agregacji. Na przykład następujący kod PySpark zlicza liczbę produktów dla każdej kategorii:

counts_df = df.select("ProductID", "Category").groupBy("Category").count()
display(counts_df)

Wyniki z tego przykładu kodu będą wyglądać mniej więcej tak:

Kategoria count
Zestawy słuchawkowe 3
Wheels 14
Rowery górskie 32
... ...

Używanie wyrażeń SQL na platformie Spark

Interfejs API ramki danych jest częścią biblioteki Spark o nazwie Spark SQL, która umożliwia analitykom danych używanie wyrażeń SQL do wykonywania zapytań o dane i manipulowania nimi.

Tworzenie obiektów bazy danych w wykazie platformy Spark

Wykaz platformy Spark to magazyn metadanych dla obiektów danych relacyjnych, takich jak widoki i tabele. Środowisko uruchomieniowe platformy Spark może używać wykazu do bezproblemowego integrowania kodu napisanego w dowolnym języku obsługiwanym przez platformę Spark z wyrażeniami SQL, które mogą być bardziej naturalne dla niektórych analityków danych lub deweloperów.

Jednym z najprostszych sposobów udostępniania danych w ramce danych na potrzeby wykonywania zapytań w wykazie platformy Spark jest utworzenie widoku tymczasowego, jak pokazano w poniższym przykładzie kodu:

df.createOrReplaceTempView("products")

Widok jest tymczasowy, co oznacza, że jest on automatycznie usuwany na końcu bieżącej sesji. Możesz również utworzyć tabele , które są utrwalane w wykazie, aby zdefiniować bazę danych, do której można wykonywać zapytania przy użyciu usługi Spark SQL.

Uwaga

Nie będziemy szczegółowo eksplorować tabel wykazu platformy Spark w tym module, ale warto poświęcić trochę czasu na wyróżnienie kilku kluczowych kwestii:

  • Pustą tabelę można utworzyć przy użyciu spark.catalog.createTable metody . Tabele to struktury metadanych, które przechowują swoje dane bazowe w lokalizacji magazynu skojarzonej z wykazem. Usunięcie tabeli powoduje również usunięcie danych bazowych.
  • Ramkę danych można zapisać jako tabelę przy użyciu jej saveAsTable metody.
  • Tabelę zewnętrzną można utworzyć przy użyciu spark.catalog.createExternalTable metody . Tabele zewnętrzne definiują metadane w wykazie, ale pobierają swoje dane bazowe z zewnętrznej lokalizacji magazynu; zazwyczaj folder w usłudze Data Lake. Usunięcie tabeli zewnętrznej nie powoduje usunięcia danych bazowych.

Wykonywanie zapytań dotyczących danych przy użyciu interfejsu API SQL platformy Spark

Interfejs API SQL platformy Spark można używać w kodzie napisanym w dowolnym języku do wykonywania zapytań dotyczących danych w wykazie. Na przykład poniższy kod PySpark używa zapytania SQL do zwracania danych z widoku produktów jako ramki danych.

bikes_df = spark.sql("SELECT ProductID, ProductName, ListPrice \
                      FROM products \
                      WHERE Category IN ('Mountain Bikes', 'Road Bikes')")
display(bikes_df)

Wyniki z przykładu kodu będą wyglądać podobnie do poniższej tabeli:

ProductName ListPrice
Górskie — 100 srebrnych, 38 3399.9900
Road-750 Black, 52 539.9900
... ...

Korzystanie z kodu SQL

W poprzednim przykładzie pokazano, jak używać interfejsu API SQL platformy Spark do osadzania wyrażeń SQL w kodzie platformy Spark. W notesie można również użyć %sql funkcji magic do uruchomienia kodu SQL, który wysyła zapytania do obiektów w wykazie, w następujący sposób:

%sql

SELECT Category, COUNT(ProductID) AS ProductCount
FROM products
GROUP BY Category
ORDER BY Category

Przykład kodu SQL zwraca zestaw wyników, który jest automatycznie wyświetlany w notesie jako tabela, podobnie jak poniższy:

Kategoria ProductCount
Bib-Shorts 3
Stojaki rowerowe 1
Stojaki rowerowe 1
... ...