Dela via


Grunderna i PySpark

Den här artikeln går igenom enkla exempel för att illustrera användningen av PySpark. Det förutsätter att du förstår grundläggande Apache Spark-begrepp och kör kommandon i en Azure Databricks-notebook-fil som är ansluten till beräkning. Du skapar DataFrames med hjälp av exempeldata, utför grundläggande transformeringar, inklusive rad- och kolumnåtgärder på dessa data, kombinerar flera DataFrames och aggregerar dessa data, visualiserar dessa data och sparar dem sedan i en tabell eller fil.

Ladda upp data

Några exempel i den här artikeln använder Databricks-exempeldata för att demonstrera användning av DataFrames för att läsa in, transformera och spara data. Om du vill använda dina egna data som ännu inte finns i Databricks kan du ladda upp dem först och skapa en DataFrame från den. Se Skapa eller ändra en tabell med filuppladdning och ladda upp filer till en Unity Catalog-volym.

Om Databricks-exempeldata

Databricks tillhandahåller exempeldata i samples katalogen och i /databricks-datasets katalogen.

  • Om du vill komma åt exempeldata i samples katalogen använder du formatet samples.<schema-name>.<table-name>. Den här artikeln använder tabeller i samples.tpch schemat, som innehåller data från ett fiktivt företag. Tabellen customer innehåller information om kunder och orders innehåller information om beställningar från dessa kunder.
  • Använd dbutils.fs.ls för att utforska data i /databricks-datasets. Använd Spark SQL eller DataFrames för att fråga efter data på den här platsen med hjälp av filsökvägar. Mer information om Databricks-exempeldata finns i Exempeldatauppsättningar.

Importera datatyper

Många PySpark-åtgärder kräver att du använder SQL-funktioner eller interagerar med inbyggda Spark-typer. Du kan antingen importera endast de funktioner och typer som du behöver direkt, eller så kan du importera hela modulen.

# import all
from pyspark.sql.types import *
from pyspark.sql.functions import *

# import select functions and types
from pyspark.sql.types import IntegerType, StringType
from pyspark.sql.functions import floor, round

Eftersom vissa importerade funktioner kan åsidosätta inbyggda Python-funktioner väljer vissa användare att importera dessa moduler med hjälp av ett alias. I följande exempel visas ett vanligt alias som används i Apache Spark-kodexempel:

import pyspark.sql.types as T
import pyspark.sql.functions as F

En omfattande lista över datatyper finns i Spark-datatyper.

En omfattande lista över PySpark SQL-funktioner finns i Spark Functions.

Skapa en dataram

Det finns flera sätt att skapa en DataFrame. Vanligtvis definierar du en DataFrame mot en datakälla, till exempel en tabell eller en samling filer. Använd sedan en åtgärd som , enligt beskrivningen i avsnittet grundläggande begrepp i Apache Spark, displayför att utlösa omvandlingarna som ska köras. Metoden display matar ut DataFrames.

Skapa en DataFrame med angivna värden

Om du vill skapa en DataFrame med angivna värden använder du createDataFrame metoden där rader uttrycks som en lista över tupplar:

df_children = spark.createDataFrame(
  data = [("Mikhail", 15), ("Zaky", 13), ("Zoya", 8)],
  schema = ['name', 'age'])
display(df_children)

Observera i utdata att datatyperna för kolumner df_children automatiskt härleds. Du kan också ange typerna genom att lägga till ett schema. Scheman definieras med hjälp StructType av som består av StructFields som anger namn, datatyp och en boolesk flagga som anger om de innehåller ett null-värde eller inte. Du måste importera datatyper från pyspark.sql.types.

from pyspark.sql.types import StructType, StructField, StringType, IntegerType

df_children_with_schema = spark.createDataFrame(
  data = [("Mikhail", 15), ("Zaky", 13), ("Zoya", 8)],
  schema = StructType([
    StructField('name', StringType(), True),
    StructField('age', IntegerType(), True)
  ])
)
display(df_children_with_schema)

Skapa en dataram från en tabell i Unity Catalog

Om du vill skapa en DataFrame från en tabell i Unity Catalog använder du table metoden som identifierar tabellen med formatet <catalog-name>.<schema-name>.<table-name>. Klicka på Katalog i det vänstra navigeringsfältet för att använda Katalogutforskaren för att navigera till tabellen. Klicka på den och välj sedan Kopiera tabellsökväg för att infoga tabellsökvägen i notebook-filen.

I följande exempel läses tabellen samples.tpch.customerin , men du kan också ange sökvägen till din egen tabell.

df_customer = spark.table('samples.tpch.customer')
display(df_customer)

Skapa en dataram från en uppladdad fil

Om du vill skapa en DataFrame från en fil som du laddade upp till Unity Catalog-volymer använder du read egenskapen . Den här metoden returnerar ett DataFrameReader, som du sedan kan använda för att läsa lämpligt format. Klicka på katalogalternativet i det lilla sidofältet till vänster och använd katalogwebbläsaren för att hitta filen. Välj den och klicka sedan på Sökvägen kopiera volymfil.

Exemplet nedan läser från en *.csv fil, men DataFrameReader stöder uppladdning av filer i många andra format. Se DataFrameReader-metoder.

# Assign this variable your full volume file path
volume_file_path = ""

df_csv = (spark.read
  .format("csv")
  .option("header", True)
  .option("inferSchema", True)
  .load(volume_file_path)
)
display(df_csv)

Mer information om Unity Catalog-volymer finns i Vad är Unity Catalog-volymer?.

Skapa en DataFrame från ett JSON-svar

Om du vill skapa en DataFrame från en JSON-svarsnyttolast som returneras av ett REST-API använder du Python-paketet requests för att fråga och parsa svaret. Du måste importera paketet för att kunna använda det. I det här exemplet används data från USA Food and Drug Administrations databas för läkemedelsprogram.

import requests

# Download data from URL
url = "https://api.fda.gov/drug/drugsfda.json?limit=100"
response = requests.get(url)

# Create the DataFrame
df_drugs = spark.createDataFrame(response.json()["results"])
display(df_drugs)

Information om hur du arbetar med JSON och andra halvstrukturerade data på Databricks finns i Modellera halvstrukturerade data.

Välj ett JSON-fält eller -objekt

Om du vill välja ett specifikt fält eller objekt från den konverterade JSON-filen använder du notationen [] . Om du till exempel vill välja det products fält som i sig är en matris med produkter:

display(df_drugs.select(df_drugs["products"]))

Du kan också länka ihop metodanrop för att korsa flera fält. Om du till exempel vill mata ut namnet på den första produkten i ett läkemedelsprogram:

display(df_drugs.select(df_drugs["products"][0]["brand_name"]))

Skapa en dataram från en fil

För att demonstrera skapandet av en DataFrame från en fil läser det här exemplet in CSV-data i /databricks-datasets katalogen.

Om du vill navigera till exempeldatauppsättningarna kan du använda filsystemkommandona Databricks Utilties . I följande exempel används dbutils för att lista de datauppsättningar som är tillgängliga i /databricks-datasets:

display(dbutils.fs.ls('/databricks-datasets'))

Du kan också använda %fs för att komma åt Databricks CLI-filsystemkommandon, som du ser i följande exempel:

%fs ls '/databricks-datasets'

Om du vill skapa en DataFrame från en fil eller katalog med filer anger du sökvägen i load metoden:

df_population = (spark.read
  .format("csv")
  .option("header", True)
  .option("inferSchema", True)
  .load("/databricks-datasets/samples/population-vs-price/data_geo.csv")
)
display(df_population)

Transformera data med DataFrames

DataFrames gör det enkelt att transformera data med hjälp av inbyggda metoder för att sortera, filtrera och aggregera data. Många transformeringar anges inte som metoder i DataFrames, utan tillhandahålls i stället i spark.sql.functions paketet. Se Databricks Spark SQL Functions.

Kolumnåtgärder

Spark innehåller många grundläggande kolumnåtgärder:

Dricks

Om du vill mata ut alla kolumner i en DataFrame använder du columns, till exempel df_customer.columns.

Välj kolumner

Du kan välja specifika kolumner med hjälp av select och col. Funktionen col finns i undermodulen pyspark.sql.functions .

from pyspark.sql.functions import col

df_customer.select(
  col("c_custkey"),
  col("c_acctbal")
)

Du kan också referera till en kolumn som använder expr ett uttryck som definieras som en sträng:

from pyspark.sql.functions import expr

df_customer.select(
  expr("c_custkey"),
  expr("c_acctbal")
)

Du kan också använda selectExpr, som accepterar SQL-uttryck:

df_customer.selectExpr(
  "c_custkey as key",
  "round(c_acctbal) as account_rounded"
)

Gör följande för att välja kolumner med hjälp av en strängliteral:

df_customer.select(
  "c_custkey",
  "c_acctbal"
)

Om du uttryckligen vill välja en kolumn från en specifik DataFrame kan du använda operatorn [] eller operatorn . . (Operatorn . kan inte användas för att välja kolumner som börjar med ett heltal eller som innehåller ett blanksteg eller specialtecken.) Detta kan vara särskilt användbart när du ansluter till DataFrames där vissa kolumner har samma namn.

df_customer.select(
  df_customer["c_custkey"],
  df_customer["c_acctbal"]
)
df_customer.select(
  df_customer.c_custkey,
  df_customer.c_acctbal
)

Skapa kolumner

Använd metoden för withColumn att skapa en ny kolumn. I följande exempel skapas en ny kolumn som innehåller ett booleskt värde baserat på om kundkontots saldo c_acctbal överskrider 1000:

df_customer_flag = df_customer.withColumn("balance_flag", col("c_acctbal") > 1000)

Byta namn på kolumner

Om du vill byta namn på en kolumn använder du withColumnRenamed metoden som accepterar de befintliga och nya kolumnnamnen:

df_customer_flag_renamed = df_customer_flag.withColumnRenamed("balance_flag", "balance_flag_renamed")

Metoden alias är särskilt användbar när du vill byta namn på kolumnerna som en del av aggregeringarna:

from pyspark.sql.functions import avg

df_segment_balance = df_customer.groupBy("c_mktsegment").agg(
    avg(df_customer["c_acctbal"]).alias("avg_account_balance")
)

display(df_segment_balance)

Typer av gjutna kolumner

I vissa fall kanske du vill ändra datatypen för en eller flera av kolumnerna i dataramen. Det gör du genom att använda cast metoden för att konvertera mellan kolumndatatyper. I följande exempel visas hur du konverterar en kolumn från ett heltal till strängtyp med hjälp av col metoden för att referera till en kolumn:

from pyspark.sql.functions import col

df_casted = df_customer.withColumn("c_custkey", col("c_custkey").cast(StringType()))
print(type(df_casted))

Ta bort kolumner

Om du vill ta bort kolumner kan du utelämna kolumner under ett val eller select(*) except använda drop metoden:

df_customer_flag_renamed.drop("balance_flag_renamed")

Du kan också släppa flera kolumner samtidigt:

df_customer_flag_renamed.drop("c_phone", "balance_flag_renamed")

Radåtgärder

Spark tillhandahåller många grundläggande radåtgärder:

Filtrera rader

Om du vill filtrera rader använder du filter metoden eller where på en DataFrame för att endast returnera vissa rader. Om du vill identifiera en kolumn att filtrera på använder du metoden col eller ett uttryck som utvärderas till en kolumn.

from pyspark.sql.functions import col

df_that_one_customer = df_customer.filter(col("c_custkey") == 412449)

Om du vill filtrera efter flera villkor använder du logiska operatorer. Till exempel & och | gör det möjligt för dig att AND och OR villkor, respektive. I följande exempel filtreras rader där c_nationkey är lika med 20 och c_acctbal är större än 1000.

df_customer.filter((col("c_nationkey") == 20) & (col("c_acctbal") > 1000))
df_filtered_customer = df_customer.filter((col("c_custkey") == 412446) | (col("c_custkey") == 412447))

Ta bort dubblettrader

Om du vill avduplicera rader använder du distinct, som endast returnerar de unika raderna.

df_unique = df_customer.distinct()

Hantera null-värden

Om du vill hantera null-värden släpper du rader som innehåller null-värden med hjälp av na.drop metoden . Med den här metoden kan du ange om du vill släppa rader som innehåller any null-värden eller all null-värden.

Om du vill släppa null-värden använder du något av följande exempel.

df_customer_no_nulls = df_customer.na.drop()
df_customer_no_nulls = df_customer.na.drop("any")

Om du i stället bara vill filtrera bort rader som innehåller alla null-värden använder du följande:

df_customer_no_nulls = df_customer.na.drop("all")

Du kan använda detta för en delmängd av kolumnerna genom att ange detta enligt nedan:

df_customer_no_nulls = df_customer.na.drop("all", subset=["c_acctbal", "c_custkey"])

Om du vill fylla i saknade värden använder du fill metoden . Du kan välja att tillämpa detta på alla kolumner eller en delmängd av kolumner. I exemplet nedan fylls kontosaldon som har ett null-värde för sitt kontosaldo c_acctbal med 0.

df_customer_filled = df_customer.na.fill("0", subset=["c_acctbal"])

Om du vill ersätta strängar med andra värden använder du replace metoden . I exemplet nedan ersätts alla tomma adresssträngar med ordet UNKNOWN:

df_customer_phone_filled = df_customer.na.replace([""], ["UNKNOWN"], subset=["c_phone"])

Lägg till rader

Om du vill lägga till rader måste du använda union metoden för att skapa en ny DataFrame. I följande exempel kombineras dataramen df_that_one_customer som skapades tidigare och df_filtered_customer som returnerar en DataFrame med tre kunder:

df_appended_rows = df_that_one_customer.union(df_filtered_customer)

display(df_appended_rows)

Kommentar

Du kan också kombinera DataFrames genom att skriva dem till en tabell och sedan lägga till nya rader. För produktionsarbetsbelastningar kan inkrementell bearbetning av datakällor till en måltabell drastiskt minska svarstiden och beräkningskostnaderna när data växer i storlek. Se Mata in data i en Databricks lakehouse.

Sortera rader

Viktigt!

Sortering kan vara dyrt i stor skala, och om du lagrar sorterade data och läser in data igen med Spark garanteras inte ordningen. Se till att du är avsiktlig när du använder sortering.

Om du vill sortera rader efter en eller flera kolumner använder du sort metoden eller orderBy . Som standard sorterar dessa metoder i stigande ordning:

df_customer.orderBy(col("c_acctbal"))

Om du vill filtrera i fallande ordning använder du desc:

df_customer.sort(col("c_custkey").desc())

I följande exempel visas hur du sorterar efter två kolumner:

df_sorted = df_customer.orderBy(col("c_acctbal").desc(), col("c_custkey").asc())
df_sorted = df_customer.sort(col("c_acctbal").desc(), col("c_custkey").asc())

Om du vill begränsa antalet rader som ska returneras när DataFrame har sorterats använder du limit metoden . I följande exempel visas endast de främsta 10 resultaten:

display(df_sorted.limit(10))

Ansluta till DataFrames

Om du vill ansluta två eller flera DataFrames använder du join metoden. Du kan ange hur du vill att DataFrames ska vara anslutna till parametrarna how (kopplingstypen) och on (på vilka kolumner som kopplingsparametrarna ska baseras). Vanliga kopplingstyper är:

  • inner: Detta är standardinställningen för kopplingstypen, som returnerar en DataFrame som endast behåller de rader där det finns en matchning för parametern on i DataFrames.
  • left: Detta behåller alla rader i den första angivna DataFrame och endast rader från den andra angivna DataFrame som har en matchning med den första.
  • outer: En yttre koppling behåller alla rader från båda DataFrames oavsett matchning.

Detaljerad information om kopplingar finns i Arbeta med kopplingar i Azure Databricks. En lista över kopplingar som stöds i PySpark finns i DataFrame-kopplingar.

I följande exempel returneras en enda DataFrame där varje rad i orders DataFrame är kopplad till motsvarande rad från customers DataFrame. En inre koppling används, eftersom förväntningarna är att varje beställning motsvarar exakt en kund.

df_customer = spark.table('samples.tpch.customer')
df_order = spark.table('samples.tpch.orders')

df_joined = df_order.join(
  df_customer,
  on = df_order["o_custkey"] == df_customer["c_custkey"],
  how = "inner"
)

display(df_joined)

Om du vill ansluta på flera villkor använder du booleska operatorer som & och | för att ange ANDORrespektive . Följande exempel lägger till ytterligare ett villkor som filtrerar till bara de rader som har o_totalprice större än 500,000:

df_customer = spark.table('samples.tpch.customer')
df_order = spark.table('samples.tpch.orders')

df_complex_joined = df_order.join(
  df_customer,
  on = ((df_order["o_custkey"] == df_customer["c_custkey"]) & (df_order["o_totalprice"] > 500000)),
  how = "inner"
)

display(df_complex_joined)

Aggregera data

Om du vill aggregera data i en DataFrame, liknande en GROUP BY i SQL, använder du groupBy metoden för att ange kolumner som ska grupperas efter och agg metoden för att ange sammansättningar. Importera vanliga sammansättningar, inklusive avg, sum, maxoch min från pyspark.sql.functions. I följande exempel visas det genomsnittliga kundsaldot per marknadssegment:

from pyspark.sql.functions import avg

# group by one column
df_segment_balance = df_customer.groupBy("c_mktsegment").agg(
    avg(df_customer["c_acctbal"])
)

display(df_segment_balance)
from pyspark.sql.functions import avg

# group by two columns
df_segment_nation_balance = df_customer.groupBy("c_mktsegment", "c_nationkey").agg(
    avg(df_customer["c_acctbal"])
)

display(df_segment_nation_balance)

Vissa aggregeringar är åtgärder, vilket innebär att de utlöser beräkningar. I det här fallet behöver du inte använda andra åtgärder för att mata ut resultat.

Om du vill räkna rader i en DataFrame använder du count metoden:

df_customer.count()

Länkningsanrop

Metoder som transformerar DataFrames returnerar DataFrames, och Spark agerar inte på transformeringar förrän åtgärder anropas. Den här lata utvärderingen innebär att du kan länka flera metoder för bekvämlighet och läsbarhet. I följande exempel visas hur du kedjar filtrering, aggregering och ordning:

from pyspark.sql.functions import count

df_chained = (
    df_order.filter(col("o_orderstatus") == "F")
    .groupBy(col("o_orderpriority"))
    .agg(count(col("o_orderkey")).alias("n_orders"))
    .sort(col("n_orders").desc())
)

display(df_chained)

Visualisera dataramen

Om du vill visualisera en DataFrame i en notebook-fil klickar du på + tecknet bredvid tabellen längst upp till vänster i DataFrame och väljer sedan Visualisering för att lägga till ett eller flera diagram baserat på din DataFrame. Mer information om visualiseringar finns i Visualiseringar i Databricks Notebooks.

display(df_order)

För att utföra ytterligare visualiseringar rekommenderar Databricks att du använder Pandas API för Spark. Gör .pandas_api() att du kan casta till motsvarande Pandas API för en Spark DataFrame. Mer information finns i Pandas API på Spark.

Spara dina data

När du har omvandlat dina data kan du spara dem med hjälp av DataFrameWriter metoderna. En fullständig lista över dessa metoder finns i DataFrameWriter. Följande avsnitt visar hur du sparar dataramen som en tabell och som en samling datafiler.

Spara dataramen som en tabell

Om du vill spara dataramen som en tabell i Unity Catalog använder du write.saveAsTable metoden och anger sökvägen i formatet <catalog-name>.<schema-name>.<table-name>.

df_joined.write.saveAsTable(f"{catalog_name}.{schema_name}.{table_name}")

Skriva dataramen som CSV

Om du vill skriva dataramen *.csv för att write.csv formatera använder du metoden och anger format och alternativ. Om data finns på den angivna sökvägen misslyckas skrivåtgärden som standard. Du kan ange något av följande lägen för att utföra en annan åtgärd:

  • overwrite skriver över alla befintliga data i målsökvägen med DataFrame-innehållet.
  • append lägger till innehållet i DataFrame till data i målsökvägen.
  • ignore misslyckas tyst skrivningen om det finns data i målsökvägen.

I följande exempel visas hur du skriver över data med DataFrame-innehåll som CSV-filer:

# Assign this variable your file path
file_path = ""

(df_joined.write
  .format("csv")
  .mode("overwrite")
  .write(file_path)
)

Nästa steg

Mer information om hur du använder Fler Spark-funktioner på Databricks finns i: