Grunderna i PySpark
Den här artikeln går igenom enkla exempel för att illustrera användningen av PySpark. Det förutsätter att du förstår grundläggande Apache Spark-begrepp och kör kommandon i en Azure Databricks-notebook-fil som är ansluten till beräkning. Du skapar DataFrames med hjälp av exempeldata, utför grundläggande transformeringar, inklusive rad- och kolumnåtgärder på dessa data, kombinerar flera DataFrames och aggregerar dessa data, visualiserar dessa data och sparar dem sedan i en tabell eller fil.
Ladda upp data
Några exempel i den här artikeln använder Databricks-exempeldata för att demonstrera användning av DataFrames för att läsa in, transformera och spara data. Om du vill använda dina egna data som ännu inte finns i Databricks kan du ladda upp dem först och skapa en DataFrame från den. Se Skapa eller ändra en tabell med filuppladdning och ladda upp filer till en Unity Catalog-volym.
Om Databricks-exempeldata
Databricks tillhandahåller exempeldata i samples
katalogen och i /databricks-datasets
katalogen.
- Om du vill komma åt exempeldata i
samples
katalogen använder du formatetsamples.<schema-name>.<table-name>
. Den här artikeln använder tabeller isamples.tpch
schemat, som innehåller data från ett fiktivt företag. Tabellencustomer
innehåller information om kunder ochorders
innehåller information om beställningar från dessa kunder. - Använd
dbutils.fs.ls
för att utforska data i/databricks-datasets
. Använd Spark SQL eller DataFrames för att fråga efter data på den här platsen med hjälp av filsökvägar. Mer information om Databricks-exempeldata finns i Exempeldatauppsättningar.
Importera datatyper
Många PySpark-åtgärder kräver att du använder SQL-funktioner eller interagerar med inbyggda Spark-typer. Du kan antingen importera endast de funktioner och typer som du behöver direkt, eller så kan du importera hela modulen.
# import all
from pyspark.sql.types import *
from pyspark.sql.functions import *
# import select functions and types
from pyspark.sql.types import IntegerType, StringType
from pyspark.sql.functions import floor, round
Eftersom vissa importerade funktioner kan åsidosätta inbyggda Python-funktioner väljer vissa användare att importera dessa moduler med hjälp av ett alias. I följande exempel visas ett vanligt alias som används i Apache Spark-kodexempel:
import pyspark.sql.types as T
import pyspark.sql.functions as F
En omfattande lista över datatyper finns i Spark-datatyper.
En omfattande lista över PySpark SQL-funktioner finns i Spark Functions.
Skapa en dataram
Det finns flera sätt att skapa en DataFrame. Vanligtvis definierar du en DataFrame mot en datakälla, till exempel en tabell eller en samling filer. Använd sedan en åtgärd som , enligt beskrivningen i avsnittet grundläggande begrepp i Apache Spark, display
för att utlösa omvandlingarna som ska köras. Metoden display
matar ut DataFrames.
Skapa en DataFrame med angivna värden
Om du vill skapa en DataFrame med angivna värden använder du createDataFrame
metoden där rader uttrycks som en lista över tupplar:
df_children = spark.createDataFrame(
data = [("Mikhail", 15), ("Zaky", 13), ("Zoya", 8)],
schema = ['name', 'age'])
display(df_children)
Observera i utdata att datatyperna för kolumner df_children
automatiskt härleds. Du kan också ange typerna genom att lägga till ett schema. Scheman definieras med hjälp StructType
av som består av StructFields
som anger namn, datatyp och en boolesk flagga som anger om de innehåller ett null-värde eller inte. Du måste importera datatyper från pyspark.sql.types
.
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
df_children_with_schema = spark.createDataFrame(
data = [("Mikhail", 15), ("Zaky", 13), ("Zoya", 8)],
schema = StructType([
StructField('name', StringType(), True),
StructField('age', IntegerType(), True)
])
)
display(df_children_with_schema)
Skapa en dataram från en tabell i Unity Catalog
Om du vill skapa en DataFrame från en tabell i Unity Catalog använder du table
metoden som identifierar tabellen med formatet <catalog-name>.<schema-name>.<table-name>
. Klicka på Katalog i det vänstra navigeringsfältet för att använda Katalogutforskaren för att navigera till tabellen. Klicka på den och välj sedan Kopiera tabellsökväg för att infoga tabellsökvägen i notebook-filen.
I följande exempel läses tabellen samples.tpch.customer
in , men du kan också ange sökvägen till din egen tabell.
df_customer = spark.table('samples.tpch.customer')
display(df_customer)
Skapa en dataram från en uppladdad fil
Om du vill skapa en DataFrame från en fil som du laddade upp till Unity Catalog-volymer använder du read
egenskapen . Den här metoden returnerar ett DataFrameReader
, som du sedan kan använda för att läsa lämpligt format. Klicka på katalogalternativet i det lilla sidofältet till vänster och använd katalogwebbläsaren för att hitta filen. Välj den och klicka sedan på Sökvägen kopiera volymfil.
Exemplet nedan läser från en *.csv
fil, men DataFrameReader
stöder uppladdning av filer i många andra format. Se DataFrameReader-metoder.
# Assign this variable your full volume file path
volume_file_path = ""
df_csv = (spark.read
.format("csv")
.option("header", True)
.option("inferSchema", True)
.load(volume_file_path)
)
display(df_csv)
Mer information om Unity Catalog-volymer finns i Vad är Unity Catalog-volymer?.
Skapa en DataFrame från ett JSON-svar
Om du vill skapa en DataFrame från en JSON-svarsnyttolast som returneras av ett REST-API använder du Python-paketet requests
för att fråga och parsa svaret. Du måste importera paketet för att kunna använda det. I det här exemplet används data från USA Food and Drug Administrations databas för läkemedelsprogram.
import requests
# Download data from URL
url = "https://api.fda.gov/drug/drugsfda.json?limit=100"
response = requests.get(url)
# Create the DataFrame
df_drugs = spark.createDataFrame(response.json()["results"])
display(df_drugs)
Information om hur du arbetar med JSON och andra halvstrukturerade data på Databricks finns i Modellera halvstrukturerade data.
Välj ett JSON-fält eller -objekt
Om du vill välja ett specifikt fält eller objekt från den konverterade JSON-filen använder du notationen []
. Om du till exempel vill välja det products
fält som i sig är en matris med produkter:
display(df_drugs.select(df_drugs["products"]))
Du kan också länka ihop metodanrop för att korsa flera fält. Om du till exempel vill mata ut namnet på den första produkten i ett läkemedelsprogram:
display(df_drugs.select(df_drugs["products"][0]["brand_name"]))
Skapa en dataram från en fil
För att demonstrera skapandet av en DataFrame från en fil läser det här exemplet in CSV-data i /databricks-datasets
katalogen.
Om du vill navigera till exempeldatauppsättningarna kan du använda filsystemkommandona Databricks Utilties . I följande exempel används dbutils
för att lista de datauppsättningar som är tillgängliga i /databricks-datasets
:
display(dbutils.fs.ls('/databricks-datasets'))
Du kan också använda %fs
för att komma åt Databricks CLI-filsystemkommandon, som du ser i följande exempel:
%fs ls '/databricks-datasets'
Om du vill skapa en DataFrame från en fil eller katalog med filer anger du sökvägen i load
metoden:
df_population = (spark.read
.format("csv")
.option("header", True)
.option("inferSchema", True)
.load("/databricks-datasets/samples/population-vs-price/data_geo.csv")
)
display(df_population)
Transformera data med DataFrames
DataFrames gör det enkelt att transformera data med hjälp av inbyggda metoder för att sortera, filtrera och aggregera data. Många transformeringar anges inte som metoder i DataFrames, utan tillhandahålls i stället i spark.sql.functions
paketet. Se Databricks Spark SQL Functions.
Kolumnåtgärder
Spark innehåller många grundläggande kolumnåtgärder:
Dricks
Om du vill mata ut alla kolumner i en DataFrame använder du columns
, till exempel df_customer.columns
.
Välj kolumner
Du kan välja specifika kolumner med hjälp av select
och col
. Funktionen col
finns i undermodulen pyspark.sql.functions
.
from pyspark.sql.functions import col
df_customer.select(
col("c_custkey"),
col("c_acctbal")
)
Du kan också referera till en kolumn som använder expr
ett uttryck som definieras som en sträng:
from pyspark.sql.functions import expr
df_customer.select(
expr("c_custkey"),
expr("c_acctbal")
)
Du kan också använda selectExpr
, som accepterar SQL-uttryck:
df_customer.selectExpr(
"c_custkey as key",
"round(c_acctbal) as account_rounded"
)
Gör följande för att välja kolumner med hjälp av en strängliteral:
df_customer.select(
"c_custkey",
"c_acctbal"
)
Om du uttryckligen vill välja en kolumn från en specifik DataFrame kan du använda operatorn []
eller operatorn .
. (Operatorn .
kan inte användas för att välja kolumner som börjar med ett heltal eller som innehåller ett blanksteg eller specialtecken.) Detta kan vara särskilt användbart när du ansluter till DataFrames där vissa kolumner har samma namn.
df_customer.select(
df_customer["c_custkey"],
df_customer["c_acctbal"]
)
df_customer.select(
df_customer.c_custkey,
df_customer.c_acctbal
)
Skapa kolumner
Använd metoden för withColumn
att skapa en ny kolumn. I följande exempel skapas en ny kolumn som innehåller ett booleskt värde baserat på om kundkontots saldo c_acctbal
överskrider 1000
:
df_customer_flag = df_customer.withColumn("balance_flag", col("c_acctbal") > 1000)
Byta namn på kolumner
Om du vill byta namn på en kolumn använder du withColumnRenamed
metoden som accepterar de befintliga och nya kolumnnamnen:
df_customer_flag_renamed = df_customer_flag.withColumnRenamed("balance_flag", "balance_flag_renamed")
Metoden alias
är särskilt användbar när du vill byta namn på kolumnerna som en del av aggregeringarna:
from pyspark.sql.functions import avg
df_segment_balance = df_customer.groupBy("c_mktsegment").agg(
avg(df_customer["c_acctbal"]).alias("avg_account_balance")
)
display(df_segment_balance)
Typer av gjutna kolumner
I vissa fall kanske du vill ändra datatypen för en eller flera av kolumnerna i dataramen. Det gör du genom att använda cast
metoden för att konvertera mellan kolumndatatyper. I följande exempel visas hur du konverterar en kolumn från ett heltal till strängtyp med hjälp av col
metoden för att referera till en kolumn:
from pyspark.sql.functions import col
df_casted = df_customer.withColumn("c_custkey", col("c_custkey").cast(StringType()))
print(type(df_casted))
Ta bort kolumner
Om du vill ta bort kolumner kan du utelämna kolumner under ett val eller select(*) except
använda drop
metoden:
df_customer_flag_renamed.drop("balance_flag_renamed")
Du kan också släppa flera kolumner samtidigt:
df_customer_flag_renamed.drop("c_phone", "balance_flag_renamed")
Radåtgärder
Spark tillhandahåller många grundläggande radåtgärder:
- Filtrera rader
- Ta bort dubblettrader
- Hantera null-värden
- Lägg till rader
- Sortera rader
- Filtrera rader
Filtrera rader
Om du vill filtrera rader använder du filter
metoden eller where
på en DataFrame för att endast returnera vissa rader. Om du vill identifiera en kolumn att filtrera på använder du metoden col
eller ett uttryck som utvärderas till en kolumn.
from pyspark.sql.functions import col
df_that_one_customer = df_customer.filter(col("c_custkey") == 412449)
Om du vill filtrera efter flera villkor använder du logiska operatorer. Till exempel &
och |
gör det möjligt för dig att AND
och OR
villkor, respektive. I följande exempel filtreras rader där c_nationkey
är lika med 20
och c_acctbal
är större än 1000
.
df_customer.filter((col("c_nationkey") == 20) & (col("c_acctbal") > 1000))
df_filtered_customer = df_customer.filter((col("c_custkey") == 412446) | (col("c_custkey") == 412447))
Ta bort dubblettrader
Om du vill avduplicera rader använder du distinct
, som endast returnerar de unika raderna.
df_unique = df_customer.distinct()
Hantera null-värden
Om du vill hantera null-värden släpper du rader som innehåller null-värden med hjälp av na.drop
metoden . Med den här metoden kan du ange om du vill släppa rader som innehåller any
null-värden eller all
null-värden.
Om du vill släppa null-värden använder du något av följande exempel.
df_customer_no_nulls = df_customer.na.drop()
df_customer_no_nulls = df_customer.na.drop("any")
Om du i stället bara vill filtrera bort rader som innehåller alla null-värden använder du följande:
df_customer_no_nulls = df_customer.na.drop("all")
Du kan använda detta för en delmängd av kolumnerna genom att ange detta enligt nedan:
df_customer_no_nulls = df_customer.na.drop("all", subset=["c_acctbal", "c_custkey"])
Om du vill fylla i saknade värden använder du fill
metoden . Du kan välja att tillämpa detta på alla kolumner eller en delmängd av kolumner. I exemplet nedan fylls kontosaldon som har ett null-värde för sitt kontosaldo c_acctbal
med 0
.
df_customer_filled = df_customer.na.fill("0", subset=["c_acctbal"])
Om du vill ersätta strängar med andra värden använder du replace
metoden . I exemplet nedan ersätts alla tomma adresssträngar med ordet UNKNOWN
:
df_customer_phone_filled = df_customer.na.replace([""], ["UNKNOWN"], subset=["c_phone"])
Lägg till rader
Om du vill lägga till rader måste du använda union
metoden för att skapa en ny DataFrame. I följande exempel kombineras dataramen df_that_one_customer
som skapades tidigare och df_filtered_customer
som returnerar en DataFrame med tre kunder:
df_appended_rows = df_that_one_customer.union(df_filtered_customer)
display(df_appended_rows)
Kommentar
Du kan också kombinera DataFrames genom att skriva dem till en tabell och sedan lägga till nya rader. För produktionsarbetsbelastningar kan inkrementell bearbetning av datakällor till en måltabell drastiskt minska svarstiden och beräkningskostnaderna när data växer i storlek. Se Mata in data i en Databricks lakehouse.
Sortera rader
Viktigt!
Sortering kan vara dyrt i stor skala, och om du lagrar sorterade data och läser in data igen med Spark garanteras inte ordningen. Se till att du är avsiktlig när du använder sortering.
Om du vill sortera rader efter en eller flera kolumner använder du sort
metoden eller orderBy
. Som standard sorterar dessa metoder i stigande ordning:
df_customer.orderBy(col("c_acctbal"))
Om du vill filtrera i fallande ordning använder du desc
:
df_customer.sort(col("c_custkey").desc())
I följande exempel visas hur du sorterar efter två kolumner:
df_sorted = df_customer.orderBy(col("c_acctbal").desc(), col("c_custkey").asc())
df_sorted = df_customer.sort(col("c_acctbal").desc(), col("c_custkey").asc())
Om du vill begränsa antalet rader som ska returneras när DataFrame har sorterats använder du limit
metoden . I följande exempel visas endast de främsta 10
resultaten:
display(df_sorted.limit(10))
Ansluta till DataFrames
Om du vill ansluta två eller flera DataFrames använder du join
metoden. Du kan ange hur du vill att DataFrames ska vara anslutna till parametrarna how
(kopplingstypen) och on
(på vilka kolumner som kopplingsparametrarna ska baseras). Vanliga kopplingstyper är:
-
inner
: Detta är standardinställningen för kopplingstypen, som returnerar en DataFrame som endast behåller de rader där det finns en matchning för parameternon
i DataFrames. -
left
: Detta behåller alla rader i den första angivna DataFrame och endast rader från den andra angivna DataFrame som har en matchning med den första. -
outer
: En yttre koppling behåller alla rader från båda DataFrames oavsett matchning.
Detaljerad information om kopplingar finns i Arbeta med kopplingar i Azure Databricks. En lista över kopplingar som stöds i PySpark finns i DataFrame-kopplingar.
I följande exempel returneras en enda DataFrame där varje rad i orders
DataFrame är kopplad till motsvarande rad från customers
DataFrame. En inre koppling används, eftersom förväntningarna är att varje beställning motsvarar exakt en kund.
df_customer = spark.table('samples.tpch.customer')
df_order = spark.table('samples.tpch.orders')
df_joined = df_order.join(
df_customer,
on = df_order["o_custkey"] == df_customer["c_custkey"],
how = "inner"
)
display(df_joined)
Om du vill ansluta på flera villkor använder du booleska operatorer som &
och |
för att ange AND
OR
respektive . Följande exempel lägger till ytterligare ett villkor som filtrerar till bara de rader som har o_totalprice
större än 500,000
:
df_customer = spark.table('samples.tpch.customer')
df_order = spark.table('samples.tpch.orders')
df_complex_joined = df_order.join(
df_customer,
on = ((df_order["o_custkey"] == df_customer["c_custkey"]) & (df_order["o_totalprice"] > 500000)),
how = "inner"
)
display(df_complex_joined)
Aggregera data
Om du vill aggregera data i en DataFrame, liknande en GROUP BY
i SQL, använder du groupBy
metoden för att ange kolumner som ska grupperas efter och agg
metoden för att ange sammansättningar. Importera vanliga sammansättningar, inklusive avg
, sum
, max
och min
från pyspark.sql.functions
. I följande exempel visas det genomsnittliga kundsaldot per marknadssegment:
from pyspark.sql.functions import avg
# group by one column
df_segment_balance = df_customer.groupBy("c_mktsegment").agg(
avg(df_customer["c_acctbal"])
)
display(df_segment_balance)
from pyspark.sql.functions import avg
# group by two columns
df_segment_nation_balance = df_customer.groupBy("c_mktsegment", "c_nationkey").agg(
avg(df_customer["c_acctbal"])
)
display(df_segment_nation_balance)
Vissa aggregeringar är åtgärder, vilket innebär att de utlöser beräkningar. I det här fallet behöver du inte använda andra åtgärder för att mata ut resultat.
Om du vill räkna rader i en DataFrame använder du count
metoden:
df_customer.count()
Länkningsanrop
Metoder som transformerar DataFrames returnerar DataFrames, och Spark agerar inte på transformeringar förrän åtgärder anropas. Den här lata utvärderingen innebär att du kan länka flera metoder för bekvämlighet och läsbarhet. I följande exempel visas hur du kedjar filtrering, aggregering och ordning:
from pyspark.sql.functions import count
df_chained = (
df_order.filter(col("o_orderstatus") == "F")
.groupBy(col("o_orderpriority"))
.agg(count(col("o_orderkey")).alias("n_orders"))
.sort(col("n_orders").desc())
)
display(df_chained)
Visualisera dataramen
Om du vill visualisera en DataFrame i en notebook-fil klickar du på + tecknet bredvid tabellen längst upp till vänster i DataFrame och väljer sedan Visualisering för att lägga till ett eller flera diagram baserat på din DataFrame. Mer information om visualiseringar finns i Visualiseringar i Databricks Notebooks.
display(df_order)
För att utföra ytterligare visualiseringar rekommenderar Databricks att du använder Pandas API för Spark. Gör .pandas_api()
att du kan casta till motsvarande Pandas API för en Spark DataFrame. Mer information finns i Pandas API på Spark.
Spara dina data
När du har omvandlat dina data kan du spara dem med hjälp av DataFrameWriter
metoderna. En fullständig lista över dessa metoder finns i DataFrameWriter. Följande avsnitt visar hur du sparar dataramen som en tabell och som en samling datafiler.
Spara dataramen som en tabell
Om du vill spara dataramen som en tabell i Unity Catalog använder du write.saveAsTable
metoden och anger sökvägen i formatet <catalog-name>.<schema-name>.<table-name>
.
df_joined.write.saveAsTable(f"{catalog_name}.{schema_name}.{table_name}")
Skriva dataramen som CSV
Om du vill skriva dataramen *.csv
för att write.csv
formatera använder du metoden och anger format och alternativ. Om data finns på den angivna sökvägen misslyckas skrivåtgärden som standard. Du kan ange något av följande lägen för att utföra en annan åtgärd:
-
overwrite
skriver över alla befintliga data i målsökvägen med DataFrame-innehållet. -
append
lägger till innehållet i DataFrame till data i målsökvägen. -
ignore
misslyckas tyst skrivningen om det finns data i målsökvägen.
I följande exempel visas hur du skriver över data med DataFrame-innehåll som CSV-filer:
# Assign this variable your file path
file_path = ""
(df_joined.write
.format("csv")
.mode("overwrite")
.write(file_path)
)
Nästa steg
Mer information om hur du använder Fler Spark-funktioner på Databricks finns i: