Základy PySparku
Tento článek vás provede jednoduchými příklady, které ilustrují použití PySparku. Předpokládá, že rozumíte základním konceptům Apache Sparku a spouštíte příkazy v poznámkovém bloku Azure Databricks připojeném k výpočetním prostředkům. Datové rámce vytvoříte pomocí ukázkových dat, provedete základní transformace, včetně operací řádků a sloupců s daty, zkombinujete více datových rámců a agregujete tato data, vizualizujete tato data a pak je uložíte do tabulky nebo souboru.
Nahrání dat
Některé příklady v tomto článku používají ukázková data poskytovaná službou Databricks k předvedení použití datových rámců k načtení, transformaci a ukládání dat. Pokud chcete použít vlastní data, která ještě nejsou v Databricks, můžete ho nejdřív nahrát a vytvořit z něj datový rámec. Viz Vytvoření nebo úprava tabulky pomocí nahrání a nahrání souborů do svazku katalogu Unity.
Ukázková data Databricks
Databricks poskytuje ukázková data v samples
katalogu a v adresáři /databricks-datasets
.
- Pokud chcete získat přístup k ukázkovým datům v
samples
katalogu, použijte formátsamples.<schema-name>.<table-name>
. Tento článek používá tabulky ve schématusamples.tpch
, které obsahují data z fiktivní firmy. Tabulkacustomer
obsahuje informace o zákaznících aorders
obsahuje informace o objednávkách zadaných těmito zákazníky. - Slouží
dbutils.fs.ls
k prozkoumání dat v/databricks-datasets
. Pomocí Spark SQL nebo datových rámců můžete dotazovat data v tomto umístění pomocí cest k souborům. Další informace o ukázkových datech poskytovaných službou Databricks najdete v tématu Ukázkové datové sady.
Import datových typů
Mnoho operací PySpark vyžaduje použití funkcí SQL nebo interakci s nativními typy Sparku. Můžete buď přímo importovat jenom ty funkce a typy, které potřebujete, nebo můžete importovat celý modul.
# import all
from pyspark.sql.types import *
from pyspark.sql.functions import *
# import select functions and types
from pyspark.sql.types import IntegerType, StringType
from pyspark.sql.functions import floor, round
Vzhledem k tomu, že některé importované funkce můžou přepsat integrované funkce Pythonu, někteří uživatelé se rozhodnou tyto moduly importovat pomocí aliasu. Následující příklady ukazují běžný alias použitý v příkladech kódu Apache Sparku:
import pyspark.sql.types as T
import pyspark.sql.functions as F
Úplný seznam datových typů najdete v tématu Datové typy Sparku.
Úplný seznam funkcí SQL PySpark najdete v tématu Funkce Sparku.
Vytvoření datového rámce
Existuje několik způsobů, jak vytvořit datový rámec. Obvykle definujete datový rámec pro zdroj dat, jako je tabulka nebo kolekce souborů. Jak je popsáno v části Základních konceptů Apache Sparku, použijte akci, napříkladdisplay
, k aktivaci transformací, které se mají provést. Metoda display
vypíše datové rámce.
Vytvoření datového rámce se zadanými hodnotami
Pokud chcete vytvořit datový rámec se zadanými hodnotami, použijte metodu createDataFrame
, kde jsou řádky vyjádřeny jako seznam řazených kolekcí členů:
df_children = spark.createDataFrame(
data = [("Mikhail", 15), ("Zaky", 13), ("Zoya", 8)],
schema = ['name', 'age'])
display(df_children)
Všimněte si ve výstupu, že datové typy sloupců df_children
jsou automaticky odvozeny. Typy můžete také zadat přidáním schématu. Schémata jsou definována pomocí StructType
toho, která je tvořena zadáním názvu, datového StructFields
typu a logického příznaku označujícího, zda obsahují hodnotu null nebo ne. Je nutné importovat datové typy z pyspark.sql.types
.
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
df_children_with_schema = spark.createDataFrame(
data = [("Mikhail", 15), ("Zaky", 13), ("Zoya", 8)],
schema = StructType([
StructField('name', StringType(), True),
StructField('age', IntegerType(), True)
])
)
display(df_children_with_schema)
Vytvoření datového rámce z tabulky v katalogu Unity
Chcete-li vytvořit datový rámec z tabulky v katalogu Unity, použijte table
metodu identifikující tabulku pomocí formátu <catalog-name>.<schema-name>.<table-name>
. Kliknutím na Katalog na levém navigačním panelu přejděte do tabulky pomocí Průzkumníka katalogu. Klikněte na něj a pak vyberte Kopírovat cestu k tabulce a vložte ji do poznámkového bloku.
Následující příklad načte tabulku samples.tpch.customer
, ale můžete alternativně zadat cestu k vlastní tabulce.
df_customer = spark.table('samples.tpch.customer')
display(df_customer)
Vytvoření datového rámce z nahraného souboru
Pokud chcete vytvořit datový rámec ze souboru, který jste nahráli do svazků katalogu Unity, použijte read
tuto vlastnost. Tato metoda vrátí DataFrameReader
hodnotu , kterou pak můžete použít ke čtení příslušného formátu. Klikněte na možnost katalogu na malém bočním panelu vlevo a pomocí prohlížeče katalogu vyhledejte soubor. Vyberte ji a potom klikněte na Kopírovat cestu k souboru svazku.
Následující příklad čte ze *.csv
souboru, ale DataFrameReader
podporuje nahrávání souborů v mnoha dalších formátech. Viz Metody DataFrameReader.
# Assign this variable your full volume file path
volume_file_path = ""
df_csv = (spark.read
.format("csv")
.option("header", True)
.option("inferSchema", True)
.load(volume_file_path)
)
display(df_csv)
Další informace o svazcích katalogu Unity najdete v tématu Co jsou svazky katalogu Unity?.
Vytvoření datového rámce z odpovědi JSON
Pokud chcete vytvořit datový rámec z datové části odpovědi JSON vrácené rozhraním REST API, použijte balíček Pythonu requests
k dotazování a analýze odpovědi. Abyste ho mohli použít, musíte balíček importovat. Tento příklad používá data z databáze aplikace drog USA Food and Drug Administration.
import requests
# Download data from URL
url = "https://api.fda.gov/drug/drugsfda.json?limit=100"
response = requests.get(url)
# Create the DataFrame
df_drugs = spark.createDataFrame(response.json()["results"])
display(df_drugs)
Informace o práci s JSON a dalšími částečně strukturovanými daty v Databricks najdete v tématu Částečně strukturovaná data modelu.
Výběr pole nebo objektu JSON
Pokud chcete vybrat konkrétní pole nebo objekt z převedeného formátu JSON, použijte []
zápis. Chcete-li například vybrat products
pole, které je samotné pole produktů:
display(df_drugs.select(df_drugs["products"]))
Můžete také zřetězit volání metod pro procházení více polí. Například pro výstup názvu značky prvního produktu v aplikaci léku:
display(df_drugs.select(df_drugs["products"][0]["brand_name"]))
Vytvoření datového rámce ze souboru
Abychom si ukázali vytvoření datového rámce ze souboru, tento příklad načte data CSV v /databricks-datasets
adresáři.
K přechodu na ukázkové datové sady můžete použít příkazy systému souborů Databricks Utilties . Následující příklad používá dbutils
k výpisu datových sad dostupných v /databricks-datasets
:
display(dbutils.fs.ls('/databricks-datasets'))
Alternativně můžete použít %fs
přístup k příkazům systému souborů Rozhraní příkazového řádku Databricks, jak je znázorněno v následujícím příkladu:
%fs ls '/databricks-datasets'
Pokud chcete vytvořit datový rámec ze souboru nebo adresáře souborů, zadejte cestu v load
metodě:
df_population = (spark.read
.format("csv")
.option("header", True)
.option("inferSchema", True)
.load("/databricks-datasets/samples/population-vs-price/data_geo.csv")
)
display(df_population)
Transformace dat pomocí datových rámců
Datové rámce usnadňují transformaci dat pomocí předdefinovaných metod řazení, filtrování a agregace dat. Mnoho transformací není určeno jako metody v datových rámcích, ale místo toho jsou součástí spark.sql.functions
balíčku. Viz Funkce SQL Pro Databricks Spark.
Operace se sloupci
Spark poskytuje mnoho základních operací se sloupci:
Tip
K výstupu všech sloupců v datovém rámci použijte columns
například df_customer.columns
.
Vyberte sloupce
Můžete vybrat konkrétní sloupce pomocí select
a col
. Funkce col
je v dílčím pyspark.sql.functions
modulu.
from pyspark.sql.functions import col
df_customer.select(
col("c_custkey"),
col("c_acctbal")
)
Můžete také odkazovat na sloupec, expr
který používá výraz definovaný jako řetězec:
from pyspark.sql.functions import expr
df_customer.select(
expr("c_custkey"),
expr("c_acctbal")
)
Můžete také použít selectExpr
, který přijímá výrazy SQL:
df_customer.selectExpr(
"c_custkey as key",
"round(c_acctbal) as account_rounded"
)
Pokud chcete vybrat sloupce pomocí řetězcového literálu, postupujte takto:
df_customer.select(
"c_custkey",
"c_acctbal"
)
Pokud chcete explicitně vybrat sloupec z konkrétního datového rámce, můžete použít []
operátor nebo .
operátor. (Operátor .
nelze použít k výběru sloupců začínajících celým číslem nebo sloupce, které obsahují mezeru nebo speciální znak.) To může být užitečné zejména při připojování datových rámců, kde mají některé sloupce stejný název.
df_customer.select(
df_customer["c_custkey"],
df_customer["c_acctbal"]
)
df_customer.select(
df_customer.c_custkey,
df_customer.c_acctbal
)
Vytvoření sloupců
K vytvoření nového sloupce použijte metodu withColumn
. Následující příklad vytvoří nový sloupec, který obsahuje logickou hodnotu na základě toho, jestli zůstatek c_acctbal
na účtu zákazníka 1000
překračuje:
df_customer_flag = df_customer.withColumn("balance_flag", col("c_acctbal") > 1000)
Přejmenování sloupců
Pokud chcete přejmenovat sloupec, použijte metodu withColumnRenamed
, která přijímá existující a nové názvy sloupců:
df_customer_flag_renamed = df_customer_flag.withColumnRenamed("balance_flag", "balance_flag_renamed")
Tato alias
metoda je užitečná hlavně v případech, kdy chcete sloupce přejmenovat jako součást agregací:
from pyspark.sql.functions import avg
df_segment_balance = df_customer.groupBy("c_mktsegment").agg(
avg(df_customer["c_acctbal"]).alias("avg_account_balance")
)
display(df_segment_balance)
Přetypování typů sloupců
V některých případech můžete chtít změnit datový typ pro jeden nebo více sloupců v datovém rámci. K tomu použijte metodu cast
pro převod mezi datovými typy sloupců. Následující příklad ukazuje, jak převést sloupec z celého čísla na typ řetězce pomocí col
metody odkazování na sloupec:
from pyspark.sql.functions import col
df_casted = df_customer.withColumn("c_custkey", col("c_custkey").cast(StringType()))
print(type(df_casted))
Odebrání sloupců
Pokud chcete sloupce odebrat, můžete sloupce vynechat během výběru nebo select(*) except
můžete použít metodu drop
:
df_customer_flag_renamed.drop("balance_flag_renamed")
Můžete také odstranit více sloupců najednou:
df_customer_flag_renamed.drop("c_phone", "balance_flag_renamed")
Operace řádků
Spark poskytuje mnoho základních operací řádků:
- Filtrování řádků
- Odebrání duplicitních řádků
- Zpracování hodnot null
- Přidávat řádky
- Řazení řádků
- Filtrování řádků
Řádky filtru
Chcete-li filtrovat řádky, použijte metodu filter
datového where
rámce k vrácení pouze určitých řádků. K identifikaci sloupce, podle kterého se má filtrovat, použijte metodu col
nebo výraz, který se vyhodnotí jako sloupec.
from pyspark.sql.functions import col
df_that_one_customer = df_customer.filter(col("c_custkey") == 412449)
Pokud chcete filtrovat podle více podmínek, použijte logické operátory. Můžete například &
|
a umožnit vám to AND
a OR
podmínky. Následující příklad filtruje řádky, ve kterých je rovna c_nationkey
20
a c_acctbal
je větší než 1000
.
df_customer.filter((col("c_nationkey") == 20) & (col("c_acctbal") > 1000))
df_filtered_customer = df_customer.filter((col("c_custkey") == 412446) | (col("c_custkey") == 412447))
Odebrání duplicitních řádků
Pokud chcete zrušit duplicitní řádky, použijte distinct
příkaz , který vrátí pouze jedinečné řádky.
df_unique = df_customer.distinct()
Zpracování hodnot null
Chcete-li zpracovat hodnoty null, odstraňte řádky, které obsahují hodnoty null pomocí na.drop
metody. Tato metoda umožňuje určit, zda chcete vypustit řádky obsahující any
hodnoty null nebo all
hodnoty null.
K vyřazení všech hodnot null použijte některý z následujících příkladů.
df_customer_no_nulls = df_customer.na.drop()
df_customer_no_nulls = df_customer.na.drop("any")
Pokud místo toho chcete vyfiltrovat pouze řádky, které obsahují všechny hodnoty null, použijte následující:
df_customer_no_nulls = df_customer.na.drop("all")
Tuto možnost můžete použít pro podmnožinu sloupců tak, že ji zadáte, jak je znázorněno níže:
df_customer_no_nulls = df_customer.na.drop("all", subset=["c_acctbal", "c_custkey"])
K vyplnění chybějících hodnot použijte metodu fill
. Tuto možnost můžete použít pro všechny sloupce nebo podmnožinu sloupců. V následujícím příkladu jsou zůstatky účtů, které mají hodnotu null pro zůstatek c_acctbal
účtu, vyplněny 0
.
df_customer_filled = df_customer.na.fill("0", subset=["c_acctbal"])
K nahrazení řetězců jinými hodnotami použijte metodu replace
. V následujícím příkladu jsou všechny prázdné řetězce adres nahrazeny slovem UNKNOWN
:
df_customer_phone_filled = df_customer.na.replace([""], ["UNKNOWN"], subset=["c_phone"])
Přidávat řádky
Pokud chcete přidat řádky, musíte použít metodu union
k vytvoření nového datového rámce. V následujícím příkladu se datový rámec df_that_one_customer
vytvořený dříve a df_filtered_customer
zkombinuje se, který vrátí datový rámec se třemi zákazníky:
df_appended_rows = df_that_one_customer.union(df_filtered_customer)
display(df_appended_rows)
Poznámka:
Datové rámce můžete také zkombinovat tak, že je zapíšete do tabulky a pak připojíte nové řádky. U produkčních úloh může přírůstkové zpracování zdrojů dat do cílové tabulky výrazně snížit latenci a náklady na výpočetní prostředky při růstu dat. Podívejte se na Ingestování dat do databricks lakehouse.
Řadit řádky
Důležité
Řazení může být nákladné ve velkém měřítku a pokud ukládáte seřazená data a data znovu načítáte pomocí Sparku, pořadí není zaručené. Ujistěte se, že používáte řazení záměrně.
Pokud chcete řadit řádky podle jednoho nebo více sloupců, použijte metodu nebo orderBy
metodusort
. Ve výchozím nastavení se tyto metody seřadí ve vzestupném pořadí:
df_customer.orderBy(col("c_acctbal"))
Pokud chcete filtrovat sestupně, použijte desc
:
df_customer.sort(col("c_custkey").desc())
Následující příklad ukazuje, jak řadit podle dvou sloupců:
df_sorted = df_customer.orderBy(col("c_acctbal").desc(), col("c_custkey").asc())
df_sorted = df_customer.sort(col("c_acctbal").desc(), col("c_custkey").asc())
Pokud chcete omezit počet řádků, které se mají vrátit po seřazení datového rámce, použijte metodu limit
. Následující příklad zobrazí pouze nejlepší 10
výsledky:
display(df_sorted.limit(10))
Spojení datových rámců
Chcete-li spojit dva nebo více datových rámců, použijte metodu join
. Můžete určit, jak se mají datové rámce spojit v parametrech how
(typ spojení) a on
(na kterých sloupcích se má spojení založit). Mezi běžné typy spojení patří:
inner
: Toto je výchozí typ spojení, který vrací datový rámec, který uchovává pouze řádky, ve kterých je shoda proon
parametr napříč datovými rámci.left
: Tím se zachová všechny řádky prvního zadaného datového rámce a pouze řádky z druhého zadaného datového rámce, které mají shodu s prvním datovým rámcem.outer
: Vnější spojení uchovává všechny řádky z obou datových rámců bez ohledu na shodu.
Podrobné informace o spojeních najdete v tématu Práce s spojeními v Azure Databricks. Seznam spojení podporovaných v PySparku najdete v tématu Spojení datového rámce.
Následující příklad vrátí jeden datový rámec, kde každý řádek datového orders
rámce je spojen s odpovídajícím řádkem z objektu customers
DataFrame. Použije se vnitřní spojení, protože očekává se, že každá objednávka odpovídá přesně jednomu zákazníkovi.
df_customer = spark.table('samples.tpch.customer')
df_order = spark.table('samples.tpch.orders')
df_joined = df_order.join(
df_customer,
on = df_order["o_custkey"] == df_customer["c_custkey"],
how = "inner"
)
display(df_joined)
Pokud se chcete spojit s několika podmínkami, použijte logické operátory, jako &
jsou a |
určit AND
a OR
v uvedeném pořadí. Následující příklad přidá další podmínku, která filtruje pouze řádky, které mají o_totalprice
větší než 500,000
:
df_customer = spark.table('samples.tpch.customer')
df_order = spark.table('samples.tpch.orders')
df_complex_joined = df_order.join(
df_customer,
on = ((df_order["o_custkey"] == df_customer["c_custkey"]) & (df_order["o_totalprice"] > 500000)),
how = "inner"
)
display(df_complex_joined)
Agregace dat
Pokud chcete agregovat data v datovém rámci, podobně jako GROUP BY
v SQL, použijte groupBy
metodu k určení sloupců, podle kterých se mají seskupit, a agg
metody k určení agregací. Import společných agregací, včetně avg
, sum
, max
a min
z pyspark.sql.functions
. Následující příklad ukazuje průměrný zůstatek zákazníků podle segmentu trhu:
from pyspark.sql.functions import avg
# group by one column
df_segment_balance = df_customer.groupBy("c_mktsegment").agg(
avg(df_customer["c_acctbal"])
)
display(df_segment_balance)
from pyspark.sql.functions import avg
# group by two columns
df_segment_nation_balance = df_customer.groupBy("c_mktsegment", "c_nationkey").agg(
avg(df_customer["c_acctbal"])
)
display(df_segment_nation_balance)
Některé agregace jsou akce, což znamená, že aktivují výpočty. V takovém případě nemusíte k výstupu výsledků používat jiné akce.
Pokud chcete spočítat řádky v datovém rámci, použijte metodu count
:
df_customer.count()
Řetězení volání
Metody, které transformují datové rámce, vrací datové rámce a Spark nečiní transformace, dokud nebudou volány akce. Toto opožděné vyhodnocení znamená, že můžete zřetězovat více metod pro usnadnění a čitelnost. Následující příklad ukazuje, jak zřetězovat filtrování, agregaci a řazení:
from pyspark.sql.functions import count
df_chained = (
df_order.filter(col("o_orderstatus") == "F")
.groupBy(col("o_orderpriority"))
.agg(count(col("o_orderkey")).alias("n_orders"))
.sort(col("n_orders").desc())
)
display(df_chained)
Vizualizace datového rámce
Pokud chcete vizualizovat datový rámec v poznámkovém bloku, klikněte na + znaménko vedle tabulky v levém horním rohu datového rámce a pak vyberte Vizualizace a přidejte jeden nebo více grafů založených na datovém rámci. Podrobnosti o vizualizacích najdete v tématu Vizualizace v poznámkových blocích Databricks.
display(df_order)
Aby bylo možné provádět další vizualizace, Databricks doporučuje používat rozhraní PANDAS API pro Spark. Umožňuje .pandas_api()
přetypovat do odpovídajícího rozhraní PANDAS API pro datový rámec Sparku. Další informace najdete v tématu Rozhraní Pandas API ve Sparku.
Uložení dat
Jakmile data transformujete, můžete je uložit pomocí DataFrameWriter
metod. Úplný seznam těchto metod najdete v objektu DataFrameWriter. Následující části ukazují, jak datový rámec uložit jako tabulku a jako kolekci datových souborů.
Uložení datového rámce jako tabulky
Pokud chcete datový rámec uložit jako tabulku v katalogu Unity, použijte metodu write.saveAsTable
a zadejte cestu ve formátu <catalog-name>.<schema-name>.<table-name>
.
df_joined.write.saveAsTable(f"{catalog_name}.{schema_name}.{table_name}")
Zápis datového rámce jako SOUBORU CSV
K zápisu datového rámce do *.csv
formátu použijte metodu write.csv
a určete formát a možnosti. Ve výchozím nastavení platí, že pokud data existují v zadané cestě, operace zápisu selže. Pokud chcete provést jinou akci, můžete zadat jeden z následujících režimů:
overwrite
přepíše všechna existující data v cílové cestě obsahem datového rámce.append
připojí obsah datového rámce k datům v cílové cestě.ignore
bezobslužně selže zápis, pokud data v cílové cestě existují.
Následující příklad ukazuje přepsání dat s obsahem datového rámce jako soubory CSV:
# Assign this variable your file path
file_path = ""
(df_joined.write
.format("csv")
.mode("overwrite")
.write(file_path)
)
Další kroky
Pokud chcete využít více možností Sparku v Databricks, přečtěte si: