Práce s datovými soubory pomocí Sparku
Jednou z výhod používání Sparku je, že můžete psát a spouštět kód v různých programovacích jazycích, což vám umožní používat programovací dovednosti, které už máte, a používat nejvhodnější jazyk pro daný úkol. Výchozím jazykem v novém poznámkovém bloku Azure Databricks Spark je PySpark – verze Pythonu optimalizovaná pro Spark, kterou běžně používají datoví vědci a analytici kvůli silné podpoře manipulace s daty a vizualizací. Kromě toho můžete používat jazyky, jako je Scala (jazyk odvozený z Javy, který se dá interaktivně používat) a SQL (varianta běžně používaného jazyka SQL, který je součástí knihovny Spark SQL pro práci se strukturami relačních dat). Softwaroví inženýři můžou také vytvářet kompilovaná řešení, která běží ve Sparku pomocí architektur, jako je Java.
Zkoumání dat pomocí datových rámců
Spark nativně používá datovou strukturu označovanou jako odolná distribuovaná datová sada (RDD), ale i když můžete psát kód, který funguje přímo se sadami RDD, nejčastěji používanou datovou strukturou pro práci se strukturovanými daty ve Sparku je datový rámec, který je součástí knihovny Spark SQL. Datové rámce ve Sparku jsou podobné datovým rámcům v všudypřítomné knihovně Pandas Pythonu, ale optimalizované pro práci v distribuovaném výpočetním prostředí Sparku.
Poznámka:
Kromě rozhraní API datového rámce poskytuje Spark SQL rozhraní API datové sady se silným typem, které je podporováno v Javě a Scala. V tomto modulu se zaměříme na rozhraní API datového rámce.
Načtení dat do datového rámce
Pojďme se podívat na hypotetický příklad, abyste viděli, jak můžete datový rámec použít k práci s daty. Předpokládejme, že máte v textovém souboru s oddělovači s oddělovači s názvem products.csv ve složce dat ve vašem úložišti systému souborů Databricks (DBFS):
ProductID,ProductName,Category,ListPrice
771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...
V poznámkovém bloku Sparku můžete pomocí následujícího kódu PySpark načíst data do datového rámce a zobrazit prvních 10 řádků:
%pyspark
df = spark.read.load('/data/products.csv',
format='csv',
header=True
)
display(df.limit(10))
Řádek %pyspark
na začátku se nazývá magie a říká Sparku, že jazyk použitý v této buňce je PySpark. Tady je ekvivalentní kód Scala pro příklad dat produktů:
%spark
val df = spark.read.format("csv").option("header", "true").load("/data/products.csv")
display(df.limit(10))
Magie %spark
se používá k určení scaly.
Tip
Můžete také vybrat jazyk, který chcete použít pro každou buňku v rozhraní poznámkového bloku.
Oba výše uvedené příklady by vytvořily výstup takto:
ProductID | ProductName | Kategorie | ListPrice |
---|---|---|---|
771 | Mountain-100 Silver, 38 | Mountain Bikes | 3399.9900 |
772 | Mountain-100 Silver, 42 | Mountain Bikes | 3399.9900 |
773 | Mountain-100 Silver, 44 | Mountain Bikes | 3399.9900 |
... | ... | ... | ... |
Určení schématu datového rámce
V předchozím příkladu obsahoval první řádek souboru CSV názvy sloupců a Spark dokázal odvodit datový typ každého sloupce z dat, která obsahuje. Můžete také zadat explicitní schéma dat, které je užitečné, když názvy sloupců nejsou zahrnuty do datového souboru, například v tomto příkladu CSV:
771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...
Následující příklad PySpark ukazuje, jak zadat schéma datového rámce, který se má načíst ze souboru s názvem product-data.csv v tomto formátu:
from pyspark.sql.types import *
from pyspark.sql.functions import *
productSchema = StructType([
StructField("ProductID", IntegerType()),
StructField("ProductName", StringType()),
StructField("Category", StringType()),
StructField("ListPrice", FloatType())
])
df = spark.read.load('/data/product-data.csv',
format='csv',
schema=productSchema,
header=False)
display(df.limit(10))
Výsledky by se opět podobaly následujícímu:
ProductID | ProductName | Kategorie | ListPrice |
---|---|---|---|
771 | Mountain-100 Silver, 38 | Mountain Bikes | 3399.9900 |
772 | Mountain-100 Silver, 42 | Mountain Bikes | 3399.9900 |
773 | Mountain-100 Silver, 44 | Mountain Bikes | 3399.9900 |
... | ... | ... | ... |
Filtrování a seskupování datových rámců
Pomocí metod třídy datového rámce můžete filtrovat, řadit, seskupovat a jinak manipulovat s daty, která obsahuje. Například následující příklad kódu používá metodu select k načtení sloupců ProductName a ListPrice z datového rámce df obsahujícího data produktu v předchozím příkladu:
pricelist_df = df.select("ProductID", "ListPrice")
Výsledky z tohoto příkladu kódu by vypadaly přibližně takto:
ProductID | ListPrice |
---|---|
771 | 3399.9900 |
772 | 3399.9900 |
773 | 3399.9900 |
... | ... |
U většiny metod manipulace s daty vrátí výběr nový objekt datového rámce.
Tip
Výběr podmnožině sloupců z datového rámce je běžná operace, kterou lze dosáhnout také pomocí následující kratší syntaxe:
pricelist_df = df["ProductID", "ListPrice"]
Metody můžete "zřetězovat" a provést řadu manipulací, které mají za následek transformovaný datový rámec. Například tento příklad kódu zřetězí výběr a kde metody vytvoření nového datového rámce obsahujícího sloupce ProductName a ListPrice pro produkty s kategorií horských kol nebo silničních kol:
bikes_df = df.select("ProductName", "ListPrice").where((df["Category"]=="Mountain Bikes") | (df["Category"]=="Road Bikes"))
display(bikes_df)
Výsledky z tohoto příkladu kódu by vypadaly přibližně takto:
ProductName | ListPrice |
---|---|
Mountain-100 Silver, 38 | 3399.9900 |
Road-750 Black, 52 | 539.9900 |
... | ... |
K seskupení a agregaci dat můžete použít metodu groupBy a agregační funkce. Například následující kód PySpark spočítá počet produktů pro každou kategorii:
counts_df = df.select("ProductID", "Category").groupBy("Category").count()
display(counts_df)
Výsledky z tohoto příkladu kódu by vypadaly přibližně takto:
Kategorie | count |
---|---|
Sluchátka | 3 |
Wheels | 14 |
Mountain Bikes | 32 |
... | ... |
Použití výrazů SQL ve Sparku
Rozhraní API datového rámce je součástí knihovny Spark s názvem Spark SQL, která datovým analytikům umožňuje dotazovat a manipulovat s daty pomocí výrazů SQL.
Vytváření databázových objektů v katalogu Spark
Katalog Sparku je metastor pro relační datové objekty, jako jsou zobrazení a tabulky. Modul runtime Sparku může pomocí katalogu bezproblémově integrovat kód napsaný v jakémkoli jazyce podporovaném sparkem s výrazy SQL, které můžou být pro některé datové analytiky nebo vývojáře přirozenější.
Jedním z nejjednodušších způsobů, jak zpřístupnit data v datovém rámci pro dotazování v katalogu Spark, je vytvořit dočasné zobrazení, jak je znázorněno v následujícím příkladu kódu:
df.createOrReplaceTempView("products")
Zobrazení je dočasné, což znamená, že se automaticky odstraní na konci aktuální relace. Můžete také vytvořit tabulky , které jsou trvalé v katalogu a definovat databázi, která se dá dotazovat pomocí Spark SQL.
Poznámka:
V tomto modulu nebudeme podrobně zkoumat tabulky katalogu Sparku, ale stojí za to si vzít čas na zvýraznění několika klíčových bodů:
- Prázdnou tabulku můžete vytvořit pomocí
spark.catalog.createTable
metody. Tabulky jsou struktury metadat, které ukládají jejich podkladová data do umístění úložiště přidruženého k katalogu. Odstraněním tabulky se odstraní také její podkladová data. - Datový rámec můžete uložit jako tabulku pomocí jeho
saveAsTable
metody. - Externí tabulku můžete vytvořit pomocí
spark.catalog.createExternalTable
metody. Externí tabulky definují metadata v katalogu, ale získávají podkladová data z externího umístění úložiště; obvykle složku v datovém jezeře. Odstraněním externí tabulky nedojde k odstranění podkladových dat.
Použití rozhraní Spark SQL API k dotazování dat
K dotazování dat v katalogu můžete použít rozhraní SPARK SQL API v kódu napsané v libovolném jazyce. Například následující kód PySpark používá dotaz SQL k vrácení dat z zobrazení produktů jako datového rámce.
bikes_df = spark.sql("SELECT ProductID, ProductName, ListPrice \
FROM products \
WHERE Category IN ('Mountain Bikes', 'Road Bikes')")
display(bikes_df)
Výsledky z příkladu kódu by vypadaly podobně jako v následující tabulce:
ProductName | ListPrice |
---|---|
Mountain-100 Silver, 38 | 3399.9900 |
Road-750 Black, 52 | 539.9900 |
... | ... |
Použití kódu SQL
Předchozí příklad ukazuje, jak pomocí rozhraní Spark SQL API vkládat výrazy SQL do kódu Sparku. V poznámkovém bloku můžete pomocí %sql
magie také spustit kód SQL, který se dotazuje na objekty v katalogu, například takto:
%sql
SELECT Category, COUNT(ProductID) AS ProductCount
FROM products
GROUP BY Category
ORDER BY Category
Příklad kódu SQL vrátí sadu výsledků, která se automaticky zobrazí v poznámkovém bloku jako tabulka, například následující:
Kategorie | ProductCount |
---|---|
Bib-Shorts | 3 |
Stojany na kola | 0 |
Kolové stojany | 0 |
... | ... |