Sdílet prostřednictvím


Základy PySparku

Tento článek vás provede jednoduchými příklady, které ilustrují použití PySparku. Předpokládá, že rozumíte základním konceptům Apache Sparku a spouštíte příkazy v poznámkovém bloku Azure Databricks připojeném k výpočetním prostředkům. Datové rámce vytvoříte pomocí ukázkových dat, provedete základní transformace, včetně operací řádků a sloupců s daty, zkombinujete více datových rámců a agregujete tato data, vizualizujete tato data a pak je uložíte do tabulky nebo souboru.

Nahrání dat

Některé příklady v tomto článku používají ukázková data poskytovaná službou Databricks k předvedení použití datových rámců k načtení, transformaci a ukládání dat. Pokud chcete použít vlastní data, která ještě nejsou v Databricks, můžete ho nejdřív nahrát a vytvořit z něj datový rámec. Viz Vytvoření nebo úprava tabulky pomocí nahrání a nahrání souborů do svazku katalogu Unity.

Ukázková data Databricks

Databricks poskytuje ukázková data v samples katalogu a v adresáři /databricks-datasets .

  • Pokud chcete získat přístup k ukázkovým datům v samples katalogu, použijte formát samples.<schema-name>.<table-name>. Tento článek používá tabulky ve schématu samples.tpch , které obsahují data z fiktivní firmy. Tabulka customer obsahuje informace o zákaznících a orders obsahuje informace o objednávkách zadaných těmito zákazníky.
  • Slouží dbutils.fs.ls k prozkoumání dat v /databricks-datasets. Pomocí Spark SQL nebo datových rámců můžete dotazovat data v tomto umístění pomocí cest k souborům. Další informace o ukázkových datech poskytovaných službou Databricks najdete v tématu Ukázkové datové sady.

Import datových typů

Mnoho operací PySpark vyžaduje použití funkcí SQL nebo interakci s nativními typy Sparku. Můžete buď přímo importovat jenom ty funkce a typy, které potřebujete, nebo můžete importovat celý modul.

# import all
from pyspark.sql.types import *
from pyspark.sql.functions import *

# import select functions and types
from pyspark.sql.types import IntegerType, StringType
from pyspark.sql.functions import floor, round

Vzhledem k tomu, že některé importované funkce můžou přepsat integrované funkce Pythonu, někteří uživatelé se rozhodnou tyto moduly importovat pomocí aliasu. Následující příklady ukazují běžný alias použitý v příkladech kódu Apache Sparku:

import pyspark.sql.types as T
import pyspark.sql.functions as F

Úplný seznam datových typů najdete v tématu Datové typy Sparku.

Úplný seznam funkcí SQL PySpark najdete v tématu Funkce Sparku.

Vytvoření datového rámce

Existuje několik způsobů, jak vytvořit datový rámec. Obvykle definujete datový rámec pro zdroj dat, jako je tabulka nebo kolekce souborů. Jak je popsáno v části Základních konceptů Apache Sparku, použijte akci, napříkladdisplay, k aktivaci transformací, které se mají provést. Metoda display vypíše datové rámce.

Vytvoření datového rámce se zadanými hodnotami

Pokud chcete vytvořit datový rámec se zadanými hodnotami, použijte metodu createDataFrame , kde jsou řádky vyjádřeny jako seznam řazených kolekcí členů:

df_children = spark.createDataFrame(
  data = [("Mikhail", 15), ("Zaky", 13), ("Zoya", 8)],
  schema = ['name', 'age'])
display(df_children)

Všimněte si ve výstupu, že datové typy sloupců df_children jsou automaticky odvozeny. Typy můžete také zadat přidáním schématu. Schémata jsou definována pomocí StructType toho, která je tvořena zadáním názvu, datového StructFields typu a logického příznaku označujícího, zda obsahují hodnotu null nebo ne. Je nutné importovat datové typy z pyspark.sql.types.

from pyspark.sql.types import StructType, StructField, StringType, IntegerType

df_children_with_schema = spark.createDataFrame(
  data = [("Mikhail", 15), ("Zaky", 13), ("Zoya", 8)],
  schema = StructType([
    StructField('name', StringType(), True),
    StructField('age', IntegerType(), True)
  ])
)
display(df_children_with_schema)

Vytvoření datového rámce z tabulky v katalogu Unity

Chcete-li vytvořit datový rámec z tabulky v katalogu Unity, použijte table metodu identifikující tabulku pomocí formátu <catalog-name>.<schema-name>.<table-name>. Kliknutím na Katalog na levém navigačním panelu přejděte do tabulky pomocí Průzkumníka katalogu. Klikněte na něj a pak vyberte Kopírovat cestu k tabulce a vložte ji do poznámkového bloku.

Následující příklad načte tabulku samples.tpch.customer, ale můžete alternativně zadat cestu k vlastní tabulce.

df_customer = spark.table('samples.tpch.customer')
display(df_customer)

Vytvoření datového rámce z nahraného souboru

Pokud chcete vytvořit datový rámec ze souboru, který jste nahráli do svazků katalogu Unity, použijte read tuto vlastnost. Tato metoda vrátí DataFrameReaderhodnotu , kterou pak můžete použít ke čtení příslušného formátu. Klikněte na možnost katalogu na malém bočním panelu vlevo a pomocí prohlížeče katalogu vyhledejte soubor. Vyberte ji a potom klikněte na Kopírovat cestu k souboru svazku.

Následující příklad čte ze *.csv souboru, ale DataFrameReader podporuje nahrávání souborů v mnoha dalších formátech. Viz Metody DataFrameReader.

# Assign this variable your full volume file path
volume_file_path = ""

df_csv = (spark.read
  .format("csv")
  .option("header", True)
  .option("inferSchema", True)
  .load(volume_file_path)
)
display(df_csv)

Další informace o svazcích katalogu Unity najdete v tématu Co jsou svazky katalogu Unity?.

Vytvoření datového rámce z odpovědi JSON

Pokud chcete vytvořit datový rámec z datové části odpovědi JSON vrácené rozhraním REST API, použijte balíček Pythonu requests k dotazování a analýze odpovědi. Abyste ho mohli použít, musíte balíček importovat. Tento příklad používá data z databáze aplikace drog USA Food and Drug Administration.

import requests

# Download data from URL
url = "https://api.fda.gov/drug/drugsfda.json?limit=100"
response = requests.get(url)

# Create the DataFrame
df_drugs = spark.createDataFrame(response.json()["results"])
display(df_drugs)

Informace o práci s JSON a dalšími částečně strukturovanými daty v Databricks najdete v tématu Částečně strukturovaná data modelu.

Výběr pole nebo objektu JSON

Pokud chcete vybrat konkrétní pole nebo objekt z převedeného formátu JSON, použijte [] zápis. Chcete-li například vybrat products pole, které je samotné pole produktů:

display(df_drugs.select(df_drugs["products"]))

Můžete také zřetězit volání metod pro procházení více polí. Například pro výstup názvu značky prvního produktu v aplikaci léku:

display(df_drugs.select(df_drugs["products"][0]["brand_name"]))

Vytvoření datového rámce ze souboru

Abychom si ukázali vytvoření datového rámce ze souboru, tento příklad načte data CSV v /databricks-datasets adresáři.

K přechodu na ukázkové datové sady můžete použít příkazy systému souborů Databricks Utilties . Následující příklad používá dbutils k výpisu datových sad dostupných v /databricks-datasets:

display(dbutils.fs.ls('/databricks-datasets'))

Alternativně můžete použít %fs přístup k příkazům systému souborů Rozhraní příkazového řádku Databricks, jak je znázorněno v následujícím příkladu:

%fs ls '/databricks-datasets'

Pokud chcete vytvořit datový rámec ze souboru nebo adresáře souborů, zadejte cestu v load metodě:

df_population = (spark.read
  .format("csv")
  .option("header", True)
  .option("inferSchema", True)
  .load("/databricks-datasets/samples/population-vs-price/data_geo.csv")
)
display(df_population)

Transformace dat pomocí datových rámců

Datové rámce usnadňují transformaci dat pomocí předdefinovaných metod řazení, filtrování a agregace dat. Mnoho transformací není určeno jako metody v datových rámcích, ale místo toho jsou součástí spark.sql.functions balíčku. Viz Funkce SQL Pro Databricks Spark.

Operace se sloupci

Spark poskytuje mnoho základních operací se sloupci:

Tip

K výstupu všech sloupců v datovém rámci použijte columnsnapříklad df_customer.columns.

Vyberte sloupce

Můžete vybrat konkrétní sloupce pomocí select a col. Funkce col je v dílčím pyspark.sql.functions modulu.

from pyspark.sql.functions import col

df_customer.select(
  col("c_custkey"),
  col("c_acctbal")
)

Můžete také odkazovat na sloupec, expr který používá výraz definovaný jako řetězec:

from pyspark.sql.functions import expr

df_customer.select(
  expr("c_custkey"),
  expr("c_acctbal")
)

Můžete také použít selectExpr, který přijímá výrazy SQL:

df_customer.selectExpr(
  "c_custkey as key",
  "round(c_acctbal) as account_rounded"
)

Pokud chcete vybrat sloupce pomocí řetězcového literálu, postupujte takto:

df_customer.select(
  "c_custkey",
  "c_acctbal"
)

Pokud chcete explicitně vybrat sloupec z konkrétního datového rámce, můžete použít [] operátor nebo . operátor. (Operátor . nelze použít k výběru sloupců začínajících celým číslem nebo sloupce, které obsahují mezeru nebo speciální znak.) To může být užitečné zejména při připojování datových rámců, kde mají některé sloupce stejný název.

df_customer.select(
  df_customer["c_custkey"],
  df_customer["c_acctbal"]
)
df_customer.select(
  df_customer.c_custkey,
  df_customer.c_acctbal
)

Vytvoření sloupců

K vytvoření nového sloupce použijte metodu withColumn . Následující příklad vytvoří nový sloupec, který obsahuje logickou hodnotu na základě toho, jestli zůstatek c_acctbal na účtu zákazníka 1000překračuje:

df_customer_flag = df_customer.withColumn("balance_flag", col("c_acctbal") > 1000)

Přejmenování sloupců

Pokud chcete přejmenovat sloupec, použijte metodu withColumnRenamed , která přijímá existující a nové názvy sloupců:

df_customer_flag_renamed = df_customer_flag.withColumnRenamed("balance_flag", "balance_flag_renamed")

Tato alias metoda je užitečná hlavně v případech, kdy chcete sloupce přejmenovat jako součást agregací:

from pyspark.sql.functions import avg

df_segment_balance = df_customer.groupBy("c_mktsegment").agg(
    avg(df_customer["c_acctbal"]).alias("avg_account_balance")
)

display(df_segment_balance)

Přetypování typů sloupců

V některých případech můžete chtít změnit datový typ pro jeden nebo více sloupců v datovém rámci. K tomu použijte metodu cast pro převod mezi datovými typy sloupců. Následující příklad ukazuje, jak převést sloupec z celého čísla na typ řetězce pomocí col metody odkazování na sloupec:

from pyspark.sql.functions import col

df_casted = df_customer.withColumn("c_custkey", col("c_custkey").cast(StringType()))
print(type(df_casted))

Odebrání sloupců

Pokud chcete sloupce odebrat, můžete sloupce vynechat během výběru nebo select(*) except můžete použít metodu drop :

df_customer_flag_renamed.drop("balance_flag_renamed")

Můžete také odstranit více sloupců najednou:

df_customer_flag_renamed.drop("c_phone", "balance_flag_renamed")

Operace řádků

Spark poskytuje mnoho základních operací řádků:

Řádky filtru

Chcete-li filtrovat řádky, použijte metodu filter datového where rámce k vrácení pouze určitých řádků. K identifikaci sloupce, podle kterého se má filtrovat, použijte metodu col nebo výraz, který se vyhodnotí jako sloupec.

from pyspark.sql.functions import col

df_that_one_customer = df_customer.filter(col("c_custkey") == 412449)

Pokud chcete filtrovat podle více podmínek, použijte logické operátory. Můžete například & | a umožnit vám to AND a OR podmínky. Následující příklad filtruje řádky, ve kterých je rovna c_nationkey 20 a c_acctbal je větší než 1000.

df_customer.filter((col("c_nationkey") == 20) & (col("c_acctbal") > 1000))
df_filtered_customer = df_customer.filter((col("c_custkey") == 412446) | (col("c_custkey") == 412447))

Odebrání duplicitních řádků

Pokud chcete zrušit duplicitní řádky, použijte distinctpříkaz , který vrátí pouze jedinečné řádky.

df_unique = df_customer.distinct()

Zpracování hodnot null

Chcete-li zpracovat hodnoty null, odstraňte řádky, které obsahují hodnoty null pomocí na.drop metody. Tato metoda umožňuje určit, zda chcete vypustit řádky obsahující any hodnoty null nebo all hodnoty null.

K vyřazení všech hodnot null použijte některý z následujících příkladů.

df_customer_no_nulls = df_customer.na.drop()
df_customer_no_nulls = df_customer.na.drop("any")

Pokud místo toho chcete vyfiltrovat pouze řádky, které obsahují všechny hodnoty null, použijte následující:

df_customer_no_nulls = df_customer.na.drop("all")

Tuto možnost můžete použít pro podmnožinu sloupců tak, že ji zadáte, jak je znázorněno níže:

df_customer_no_nulls = df_customer.na.drop("all", subset=["c_acctbal", "c_custkey"])

K vyplnění chybějících hodnot použijte metodu fill . Tuto možnost můžete použít pro všechny sloupce nebo podmnožinu sloupců. V následujícím příkladu jsou zůstatky účtů, které mají hodnotu null pro zůstatek c_acctbal účtu, vyplněny 0.

df_customer_filled = df_customer.na.fill("0", subset=["c_acctbal"])

K nahrazení řetězců jinými hodnotami použijte metodu replace . V následujícím příkladu jsou všechny prázdné řetězce adres nahrazeny slovem UNKNOWN:

df_customer_phone_filled = df_customer.na.replace([""], ["UNKNOWN"], subset=["c_phone"])

Přidávat řádky

Pokud chcete přidat řádky, musíte použít metodu union k vytvoření nového datového rámce. V následujícím příkladu se datový rámec df_that_one_customer vytvořený dříve a df_filtered_customer zkombinuje se, který vrátí datový rámec se třemi zákazníky:

df_appended_rows = df_that_one_customer.union(df_filtered_customer)

display(df_appended_rows)

Poznámka:

Datové rámce můžete také zkombinovat tak, že je zapíšete do tabulky a pak připojíte nové řádky. U produkčních úloh může přírůstkové zpracování zdrojů dat do cílové tabulky výrazně snížit latenci a náklady na výpočetní prostředky při růstu dat. Podívejte se na Ingestování dat do databricks lakehouse.

Řadit řádky

Důležité

Řazení může být nákladné ve velkém měřítku a pokud ukládáte seřazená data a data znovu načítáte pomocí Sparku, pořadí není zaručené. Ujistěte se, že používáte řazení záměrně.

Pokud chcete řadit řádky podle jednoho nebo více sloupců, použijte metodu nebo orderBy metodusort. Ve výchozím nastavení se tyto metody seřadí ve vzestupném pořadí:

df_customer.orderBy(col("c_acctbal"))

Pokud chcete filtrovat sestupně, použijte desc:

df_customer.sort(col("c_custkey").desc())

Následující příklad ukazuje, jak řadit podle dvou sloupců:

df_sorted = df_customer.orderBy(col("c_acctbal").desc(), col("c_custkey").asc())
df_sorted = df_customer.sort(col("c_acctbal").desc(), col("c_custkey").asc())

Pokud chcete omezit počet řádků, které se mají vrátit po seřazení datového rámce, použijte metodu limit . Následující příklad zobrazí pouze nejlepší 10 výsledky:

display(df_sorted.limit(10))

Spojení datových rámců

Chcete-li spojit dva nebo více datových rámců, použijte metodu join . Můžete určit, jak se mají datové rámce spojit v parametrech how (typ spojení) a on (na kterých sloupcích se má spojení založit). Mezi běžné typy spojení patří:

  • inner: Toto je výchozí typ spojení, který vrací datový rámec, který uchovává pouze řádky, ve kterých je shoda pro on parametr napříč datovými rámci.
  • left: Tím se zachová všechny řádky prvního zadaného datového rámce a pouze řádky z druhého zadaného datového rámce, které mají shodu s prvním datovým rámcem.
  • outer: Vnější spojení uchovává všechny řádky z obou datových rámců bez ohledu na shodu.

Podrobné informace o spojeních najdete v tématu Práce s spojeními v Azure Databricks. Seznam spojení podporovaných v PySparku najdete v tématu Spojení datového rámce.

Následující příklad vrátí jeden datový rámec, kde každý řádek datového orders rámce je spojen s odpovídajícím řádkem z objektu customers DataFrame. Použije se vnitřní spojení, protože očekává se, že každá objednávka odpovídá přesně jednomu zákazníkovi.

df_customer = spark.table('samples.tpch.customer')
df_order = spark.table('samples.tpch.orders')

df_joined = df_order.join(
  df_customer,
  on = df_order["o_custkey"] == df_customer["c_custkey"],
  how = "inner"
)

display(df_joined)

Pokud se chcete spojit s několika podmínkami, použijte logické operátory, jako & jsou a | určit AND a ORv uvedeném pořadí. Následující příklad přidá další podmínku, která filtruje pouze řádky, které mají o_totalprice větší než 500,000:

df_customer = spark.table('samples.tpch.customer')
df_order = spark.table('samples.tpch.orders')

df_complex_joined = df_order.join(
  df_customer,
  on = ((df_order["o_custkey"] == df_customer["c_custkey"]) & (df_order["o_totalprice"] > 500000)),
  how = "inner"
)

display(df_complex_joined)

Agregace dat

Pokud chcete agregovat data v datovém rámci, podobně jako GROUP BY v SQL, použijte groupBy metodu k určení sloupců, podle kterých se mají seskupit, a agg metody k určení agregací. Import společných agregací, včetně avg, sum, maxa min z pyspark.sql.functions. Následující příklad ukazuje průměrný zůstatek zákazníků podle segmentu trhu:

from pyspark.sql.functions import avg

# group by one column
df_segment_balance = df_customer.groupBy("c_mktsegment").agg(
    avg(df_customer["c_acctbal"])
)

display(df_segment_balance)
from pyspark.sql.functions import avg

# group by two columns
df_segment_nation_balance = df_customer.groupBy("c_mktsegment", "c_nationkey").agg(
    avg(df_customer["c_acctbal"])
)

display(df_segment_nation_balance)

Některé agregace jsou akce, což znamená, že aktivují výpočty. V takovém případě nemusíte k výstupu výsledků používat jiné akce.

Pokud chcete spočítat řádky v datovém rámci, použijte metodu count :

df_customer.count()

Řetězení volání

Metody, které transformují datové rámce, vrací datové rámce a Spark nečiní transformace, dokud nebudou volány akce. Toto opožděné vyhodnocení znamená, že můžete zřetězovat více metod pro usnadnění a čitelnost. Následující příklad ukazuje, jak zřetězovat filtrování, agregaci a řazení:

from pyspark.sql.functions import count

df_chained = (
    df_order.filter(col("o_orderstatus") == "F")
    .groupBy(col("o_orderpriority"))
    .agg(count(col("o_orderkey")).alias("n_orders"))
    .sort(col("n_orders").desc())
)

display(df_chained)

Vizualizace datového rámce

Pokud chcete vizualizovat datový rámec v poznámkovém bloku, klikněte na + znaménko vedle tabulky v levém horním rohu datového rámce a pak vyberte Vizualizace a přidejte jeden nebo více grafů založených na datovém rámci. Podrobnosti o vizualizacích najdete v tématu Vizualizace v poznámkových blocích Databricks.

display(df_order)

Aby bylo možné provádět další vizualizace, Databricks doporučuje používat rozhraní PANDAS API pro Spark. Umožňuje .pandas_api() přetypovat do odpovídajícího rozhraní PANDAS API pro datový rámec Sparku. Další informace najdete v tématu Rozhraní Pandas API ve Sparku.

Uložení dat

Jakmile data transformujete, můžete je uložit pomocí DataFrameWriter metod. Úplný seznam těchto metod najdete v objektu DataFrameWriter. Následující části ukazují, jak datový rámec uložit jako tabulku a jako kolekci datových souborů.

Uložení datového rámce jako tabulky

Pokud chcete datový rámec uložit jako tabulku v katalogu Unity, použijte metodu write.saveAsTable a zadejte cestu ve formátu <catalog-name>.<schema-name>.<table-name>.

df_joined.write.saveAsTable(f"{catalog_name}.{schema_name}.{table_name}")

Zápis datového rámce jako SOUBORU CSV

K zápisu datového rámce do *.csv formátu použijte metodu write.csv a určete formát a možnosti. Ve výchozím nastavení platí, že pokud data existují v zadané cestě, operace zápisu selže. Pokud chcete provést jinou akci, můžete zadat jeden z následujících režimů:

  • overwrite přepíše všechna existující data v cílové cestě obsahem datového rámce.
  • append připojí obsah datového rámce k datům v cílové cestě.
  • ignore bezobslužně selže zápis, pokud data v cílové cestě existují.

Následující příklad ukazuje přepsání dat s obsahem datového rámce jako soubory CSV:

# Assign this variable your file path
file_path = ""

(df_joined.write
  .format("csv")
  .mode("overwrite")
  .write(file_path)
)

Další kroky

Pokud chcete využít více možností Sparku v Databricks, přečtěte si: