Práce s daty v datovém rámci Sparku

Dokončeno

Spark nativně používá datovou strukturu označovanou jako odolná distribuovaná datová sada (RDD), ale i když můžete psát kód, který funguje přímo se sadami RDD, nejčastěji používanou datovou strukturou pro práci se strukturovanými daty ve Sparku je datový rámec, který je součástí knihovny Spark SQL. Datové rámce ve Sparku jsou podobné datovým rámcům v všudypřítomné knihovně Pandas Pythonu, ale optimalizované pro práci v distribuovaném výpočetním prostředí Sparku.

Poznámka:

Kromě rozhraní API datového rámce poskytuje Spark SQL rozhraní API datové sady se silným typem, které je podporováno v Javě a Scala. V tomto modulu se zaměříme na rozhraní API datového rámce.

Načtení dat do datového rámce

Pojďme se podívat na hypotetický příklad, abyste viděli, jak můžete datový rámec použít k práci s daty. Předpokládejme, že máte následující data v textovém souboru s oddělovači s názvem products.csv ve složce Soubory/data ve vašem jezeře:

ProductID,ProductName,Category,ListPrice
771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...

Odvození schématu

V poznámkovém bloku Sparku můžete pomocí následujícího kódu PySpark načíst data souboru do datového rámce a zobrazit prvních 10 řádků:

%%pyspark
df = spark.read.load('Files/data/products.csv',
    format='csv',
    header=True
)
display(df.limit(10))

Řádek %%pyspark na začátku se nazývá magie a říká Sparku, že jazyk použitý v této buňce je PySpark. Jazyk, který chcete použít jako výchozí, můžete vybrat na panelu nástrojů rozhraní Poznámkový blok a pak pomocí magie přepsat tuto volbu pro konkrétní buňku. Tady je například ekvivalentní kód Scala pro příklad dat produktů:

%%spark
val df = spark.read.format("csv").option("header", "true").load("Files/data/products.csv")
display(df.limit(10))

Magie %%spark se používá k určení scaly.

Oba tyto ukázky kódu by vytvořily výstup podobný tomuto:

ProductID ProductName Kategorie ListPrice
771 Mountain-100 Silver, 38 Mountain Bikes 3399.9900
772 Mountain-100 Silver, 42 Mountain Bikes 3399.9900
773 Mountain-100 Silver, 44 Mountain Bikes 3399.9900
... ... ... ...

Určení explicitního schématu

V předchozím příkladu obsahoval první řádek souboru CSV názvy sloupců a Spark dokázal odvodit datový typ každého sloupce z dat, která obsahuje. Můžete také zadat explicitní schéma dat, které je užitečné, když názvy sloupců nejsou zahrnuty do datového souboru, například v tomto příkladu CSV:

771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...

Následující příklad PySpark ukazuje, jak zadat schéma datového rámce, který se má načíst ze souboru s názvem product-data.csv v tomto formátu:

from pyspark.sql.types import *
from pyspark.sql.functions import *

productSchema = StructType([
    StructField("ProductID", IntegerType()),
    StructField("ProductName", StringType()),
    StructField("Category", StringType()),
    StructField("ListPrice", FloatType())
    ])

df = spark.read.load('Files/data/product-data.csv',
    format='csv',
    schema=productSchema,
    header=False)
display(df.limit(10))

Výsledky by se opět podobaly následujícímu:

ProductID ProductName Kategorie ListPrice
771 Mountain-100 Silver, 38 Mountain Bikes 3399.9900
772 Mountain-100 Silver, 42 Mountain Bikes 3399.9900
773 Mountain-100 Silver, 44 Mountain Bikes 3399.9900
... ... ... ...

Tip

Určení explicitního schématu také zvyšuje výkon!

Filtrování a seskupování datových rámců

Pomocí metod třídy datového rámce můžete filtrovat, řadit, seskupovat a jinak manipulovat s daty, která obsahuje. Například následující příklad kódu používá metodu select k načtení sloupců ProductID a ListPrice z datového rámce df obsahujícího data produktu v předchozím příkladu:

pricelist_df = df.select("ProductID", "ListPrice")

Výsledky z tohoto příkladu kódu by vypadaly přibližně takto:

ProductID ListPrice
771 3399.9900
772 3399.9900
773 3399.9900
... ...

U většiny metod manipulace s daty vrátí výběr nový objekt datového rámce.

Tip

Výběr podmnožině sloupců z datového rámce je běžná operace, kterou lze dosáhnout také pomocí následující kratší syntaxe:

pricelist_df = df["ProductID", "ListPrice"]

Metody můžete "zřetězovat" a provést řadu manipulací, které mají za následek transformovaný datový rámec. Například tento příklad kódu zřetězí výběr a kde metody vytvoření nového datového rámce obsahujícího sloupce ProductName a ListPrice pro produkty s kategorií horských kol nebo silničních kol:

bikes_df = df.select("ProductName", "Category", "ListPrice").where((df["Category"]=="Mountain Bikes") | (df["Category"]=="Road Bikes"))
display(bikes_df)

Výsledky z tohoto příkladu kódu by vypadaly přibližně takto:

ProductName Kategorie ListPrice
Mountain-100 Silver, 38 Mountain Bikes 3399.9900
Road-750 Black, 52 Road Bikes 539.9900
... ... ...

K seskupení a agregaci dat můžete použít metodu groupBy a agregační funkce. Například následující kód PySpark spočítá počet produktů pro každou kategorii:

counts_df = df.select("ProductID", "Category").groupBy("Category").count()
display(counts_df)

Výsledky z tohoto příkladu kódu by vypadaly přibližně takto:

Kategorie count
Sluchátka 3
Wheels 14
Mountain Bikes 32
... ...

Uložení datového rámce

Spark často budete chtít použít k transformaci nezpracovaných dat a uložení výsledků pro další analýzu nebo podřízené zpracování. Následující příklad kódu uloží datový rámec do souboru parquet v datovém jezeře a nahradí všechny existující soubory se stejným názvem.

bikes_df.write.mode("overwrite").parquet('Files/product_data/bikes.parquet')

Poznámka:

Formát Parquet je obvykle upřednostňovaný pro datové soubory, které použijete k další analýze nebo příjmu dat do analytického úložiště. Parquet je velmi efektivní formát, který podporuje většina rozsáhlých systémů pro analýzu dat. V některých případech může být vaším požadavkem na transformaci dat jednoduše převod dat z jiného formátu (například CSV) na Parquet!

Dělení výstupního souboru

Dělení je technika optimalizace, která sparku umožňuje maximalizovat výkon napříč pracovními uzly. Při filtrování dat v dotazech je možné dosáhnout vyššího zvýšení výkonu odstraněním nepotřebných vstupně-výstupních operací disku.

Pokud chcete datový rámec uložit jako dělenou sadu souborů, použijte při zápisu dat metodu partitionBy . Následující příklad uloží datový rámec bikes_df (který obsahuje údaje o produktech pro kategorie horských kol a silničních kol) a rozdělí data podle kategorie:

bikes_df.write.partitionBy("Category").mode("overwrite").parquet("Files/bike_data")

Názvy složek vygenerované při dělení datového rámce zahrnují název a hodnotu sloupce dělení ve formátu column=value, takže příklad kódu vytvoří složku s názvem bike_data, která obsahuje následující podsložky:

  • Category=Mountain Bikes
  • Category=Silniční kola

Každá podsložka obsahuje jeden nebo více souborů parquet s daty produktů pro příslušnou kategorii.

Poznámka:

Data můžete rozdělit podle několika sloupců, což vede k hierarchii složek pro každý klíč dělení. Můžete například rozdělit data prodejní objednávky podle roku a měsíce, aby hierarchie složek obsahovala složku pro každou hodnotu roku, která zase obsahuje podsložku pro každou hodnotu měsíce.

Načtení dělených dat

Při čtení dělených dat do datového rámce můžete načíst data z libovolné složky v hierarchii zadáním explicitních hodnot nebo zástupných znaků pro dělená pole. Následující příklad načte data pro produkty v kategorii Silniční kola :

road_bikes_df = spark.read.parquet('Files/bike_data/Category=Road Bikes')
display(road_bikes_df.limit(5))

Poznámka:

Ve výsledném datovém rámci se vynechá sloupce dělení zadané v cestě k souboru. Výsledky vytvořené ukázkovým dotazem by neobsadovaly sloupec Kategorie – kategorie pro všechny řádky by byla Silniční kola.