Basisbeginselen van PySpark
In dit artikel worden eenvoudige voorbeelden beschreven om het gebruik van PySpark te illustreren. Hierbij wordt ervan uitgegaan dat u de basisconcepten van Apache Spark begrijpt en opdrachten uitvoert in een Azure Databricks-notebook dat is verbonden met compute. U maakt DataFrames met voorbeeldgegevens, voert basistransformaties uit, inclusief rij- en kolombewerkingen op deze gegevens, combineert meerdere DataFrames en aggregeert deze gegevens, visualiseert deze gegevens en slaat deze vervolgens op in een tabel of bestand.
Gegevens uploaden
In sommige voorbeelden in dit artikel worden voorbeeldgegevens van Databricks gebruikt om te laten zien hoe u DataFrames gebruikt om gegevens te laden, transformeren en opslaan. Als u uw eigen gegevens wilt gebruiken die nog niet in Databricks aanwezig zijn, kunt u deze eerst uploaden en er een DataFrame van maken. Zie Een tabel maken of wijzigen met behulp van het uploaden en uploaden van bestanden naar een Unity Catalog-volume.
Over Databricks-voorbeeldgegevens
Databricks biedt voorbeeldgegevens in de samples
catalogus en in de /databricks-datasets
map.
- Gebruik de indeling
samples.<schema-name>.<table-name>
om toegang te krijgen tot de voorbeeldgegevens in desamples
catalogus. In dit artikel worden tabellen in hetsamples.tpch
schema gebruikt die gegevens uit een fictief bedrijf bevatten. Decustomer
tabel bevat informatie over klanten enorders
bevat informatie over orders die door deze klanten zijn geplaatst. - Gebruiken
dbutils.fs.ls
om gegevens in/databricks-datasets
te verkennen. Gebruik Spark SQL of DataFrames om gegevens op deze locatie op te vragen met behulp van bestandspaden. Zie Voorbeeldgegevenssets voor meer informatie over door Databricks geleverde voorbeeldgegevens.
Gegevenstypen importeren
Voor veel PySpark-bewerkingen moet u SQL-functies gebruiken of communiceren met systeemeigen Spark-typen. U kunt alleen de functies en typen die u nodig hebt rechtstreeks importeren, of u kunt de hele module importeren.
# import all
from pyspark.sql.types import *
from pyspark.sql.functions import *
# import select functions and types
from pyspark.sql.types import IntegerType, StringType
from pyspark.sql.functions import floor, round
Omdat sommige geïmporteerde functies mogelijk ingebouwde Python-functies overschrijven, kiezen sommige gebruikers ervoor om deze modules te importeren met behulp van een alias. In de volgende voorbeelden ziet u een algemene alias die wordt gebruikt in Apache Spark-codevoorbeelden:
import pyspark.sql.types as T
import pyspark.sql.functions as F
Zie Spark-gegevenstypen voor een uitgebreide lijst met gegevenstypen.
Zie Spark Functions voor een uitgebreide lijst met PySpark SQL-functies.
Een DataFrame maken
Er zijn verschillende manieren om een DataFrame te maken. Meestal definieert u een DataFrame op basis van een gegevensbron, zoals een tabel of verzameling bestanden. Gebruik vervolgens, zoals beschreven in de sectie basisconcepten van Apache Spark, een actie, zoals display
, om de transformaties te activeren die moeten worden uitgevoerd. De display
methode voert DataFrames uit.
Een DataFrame met opgegeven waarden maken
Als u een DataFrame met opgegeven waarden wilt maken, gebruikt u de createDataFrame
methode, waarbij rijen worden uitgedrukt als een lijst met tuples:
df_children = spark.createDataFrame(
data = [("Mikhail", 15), ("Zaky", 13), ("Zoya", 8)],
schema = ['name', 'age'])
display(df_children)
U ziet in de uitvoer dat de gegevenstypen van kolommen df_children
automatisch worden afgeleid. U kunt de typen ook opgeven door een schema toe te voegen. Schema's worden gedefinieerd met behulp van StructFields
de StructType
opgegeven naam, het gegevenstype en een booleaanse vlag die aangeeft of ze een null-waarde bevatten of niet. U moet gegevenstypen importeren uit pyspark.sql.types
.
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
df_children_with_schema = spark.createDataFrame(
data = [("Mikhail", 15), ("Zaky", 13), ("Zoya", 8)],
schema = StructType([
StructField('name', StringType(), True),
StructField('age', IntegerType(), True)
])
)
display(df_children_with_schema)
Een DataFrame maken op basis van een tabel in Unity Catalog
Als u een DataFrame wilt maken op basis van een tabel in Unity Catalog, gebruikt u de table
methode voor het identificeren van de tabel met de indeling <catalog-name>.<schema-name>.<table-name>
. Klik op Catalogus op de linkernavigatiebalk om Catalogusverkenner te gebruiken om naar uw tabel te navigeren. Klik erop en selecteer Vervolgens het pad naar de tabel kopiëren om het tabelpad in het notitieblok in te voegen.
In het volgende voorbeeld wordt de tabel samples.tpch.customer
geladen, maar u kunt ook het pad naar uw eigen tabel opgeven.
df_customer = spark.table('samples.tpch.customer')
display(df_customer)
Een DataFrame maken op basis van een geüpload bestand
Als u een DataFrame wilt maken van een bestand dat u hebt geüpload naar Unity Catalog-volumes, gebruikt u de read
eigenschap. Deze methode retourneert een DataFrameReader
, die u vervolgens kunt gebruiken om de juiste indeling te lezen. Klik op de catalogusoptie op de kleine zijbalk aan de linkerkant en gebruik de catalogusbrowser om uw bestand te zoeken. Selecteer deze en klik vervolgens op Pad naar volumebestand kopiëren.
Het onderstaande voorbeeld leest uit een *.csv
bestand, maar DataFrameReader
ondersteunt het uploaden van bestanden in veel andere indelingen. Zie DataFrameReader-methoden.
# Assign this variable your full volume file path
volume_file_path = ""
df_csv = (spark.read
.format("csv")
.option("header", True)
.option("inferSchema", True)
.load(volume_file_path)
)
display(df_csv)
Zie Wat zijn Unity Catalog-volumes? voor meer informatie over Unity Catalog-volumes.
Een DataFrame maken op basis van een JSON-antwoord
Als u een DataFrame wilt maken op basis van een nettolading van een JSON-antwoord die wordt geretourneerd door een REST API, gebruikt u het Python-pakket requests
om een query uit te voeren en het antwoord te parseren. U moet het pakket importeren om het te gebruiken. In dit voorbeeld worden gegevens uit de database van de Verenigde Staten Food and Drug Administration-toepassingstoepassing gebruikt.
import requests
# Download data from URL
url = "https://api.fda.gov/drug/drugsfda.json?limit=100"
response = requests.get(url)
# Create the DataFrame
df_drugs = spark.createDataFrame(response.json()["results"])
display(df_drugs)
Zie Semi-gestructureerde modelgegevens voor informatie over het werken met JSON en andere semi-gestructureerde gegevens in Databricks.
Een JSON-veld of -object selecteren
Als u een specifiek veld of object wilt selecteren in de geconverteerde JSON, gebruikt u de []
notatie. Als u bijvoorbeeld het products
veld wilt selecteren dat zelf een matrix met producten is:
display(df_drugs.select(df_drugs["products"]))
U kunt ook methode-aanroepen koppelen om meerdere velden te doorlopen. Als u bijvoorbeeld de merknaam van het eerste product in een drugtoepassing wilt uitvoeren:
display(df_drugs.select(df_drugs["products"][0]["brand_name"]))
Een DataFrame maken op basis van een bestand
Als u wilt laten zien hoe u een DataFrame maakt op basis van een bestand, worden in dit voorbeeld CSV-gegevens in de /databricks-datasets
map geladen.
Als u naar de voorbeeldgegevenssets wilt navigeren, kunt u de databricks Utilties-bestandssysteemopdrachten gebruiken. In het volgende voorbeeld worden dbutils
de gegevenssets vermeld die beschikbaar zijn in /databricks-datasets
:
display(dbutils.fs.ls('/databricks-datasets'))
U kunt ook toegang krijgen %fs
tot databricks CLI-bestandssysteemopdrachten, zoals wordt weergegeven in het volgende voorbeeld:
%fs ls '/databricks-datasets'
Als u een DataFrame wilt maken op basis van een bestand of map met bestanden, geeft u het pad op in de load
methode:
df_population = (spark.read
.format("csv")
.option("header", True)
.option("inferSchema", True)
.load("/databricks-datasets/samples/population-vs-price/data_geo.csv")
)
display(df_population)
Gegevens transformeren met DataFrames
Met DataFrames kunt u eenvoudig gegevens transformeren met behulp van ingebouwde methoden om gegevens te sorteren, filteren en aggregeren. Veel transformaties worden niet opgegeven als methoden in DataFrames, maar worden in plaats daarvan geleverd in het spark.sql.functions
pakket. Zie Databricks Spark SQL Functions.
Kolombewerkingen
Spark biedt veel basiskolombewerkingen:
- Kolommen selecteren
- Kolommen maken
- De naam van kolommen wijzigen
- Cast-kolomtypen
- Kolommen verwijderen
Tip
Als u alle kolommen in een DataFrame wilt uitvoeren, gebruikt columns
u bijvoorbeeld df_customer.columns
.
Kolommen selecteren
U kunt specifieke kolommen selecteren met en select
col
. De col
functie bevindt zich in de pyspark.sql.functions
submodule.
from pyspark.sql.functions import col
df_customer.select(
col("c_custkey"),
col("c_acctbal")
)
U kunt ook verwijzen naar een kolom die expr
een expressie gebruikt die is gedefinieerd als een tekenreeks:
from pyspark.sql.functions import expr
df_customer.select(
expr("c_custkey"),
expr("c_acctbal")
)
U kunt ook SQL-expressies gebruiken selectExpr
:
df_customer.selectExpr(
"c_custkey as key",
"round(c_acctbal) as account_rounded"
)
Ga als volgt te werk om kolommen te selecteren met een letterlijke tekenreeks:
df_customer.select(
"c_custkey",
"c_acctbal"
)
Als u expliciet een kolom uit een specifiek DataFrame wilt selecteren, kunt u de []
operator of de .
operator gebruiken. (De .
operator kan niet worden gebruikt om kolommen te selecteren die beginnen met een geheel getal of kolommen die een spatie of speciaal teken bevatten.) Dit kan met name handig zijn wanneer u dataframes koppelt waarbij sommige kolommen dezelfde naam hebben.
df_customer.select(
df_customer["c_custkey"],
df_customer["c_acctbal"]
)
df_customer.select(
df_customer.c_custkey,
df_customer.c_acctbal
)
Kolommen maken
Als u een nieuwe kolom wilt maken, gebruikt u de withColumn
methode. In het volgende voorbeeld wordt een nieuwe kolom gemaakt die een Booleaanse waarde bevat op basis van het feit of het saldo c_acctbal
van de klantrekening groter is 1000
dan:
df_customer_flag = df_customer.withColumn("balance_flag", col("c_acctbal") > 1000)
Namen van kolommen wijzigen
Als u de naam van een kolom wilt wijzigen, gebruikt u de withColumnRenamed
methode die de bestaande en nieuwe kolomnamen accepteert:
df_customer_flag_renamed = df_customer_flag.withColumnRenamed("balance_flag", "balance_flag_renamed")
De alias
methode is vooral handig als u de naam van uw kolommen wilt wijzigen als onderdeel van aggregaties:
from pyspark.sql.functions import avg
df_segment_balance = df_customer.groupBy("c_mktsegment").agg(
avg(df_customer["c_acctbal"]).alias("avg_account_balance")
)
display(df_segment_balance)
Cast-kolomtypen
In sommige gevallen kunt u het gegevenstype voor een of meer kolommen in uw DataFrame wijzigen. Hiervoor gebruikt u de cast
methode om te converteren tussen kolomgegevenstypen. In het volgende voorbeeld ziet u hoe u een kolom converteert van een geheel getal naar een tekenreekstype, met behulp van de col
methode om te verwijzen naar een kolom:
from pyspark.sql.functions import col
df_casted = df_customer.withColumn("c_custkey", col("c_custkey").cast(StringType()))
print(type(df_casted))
Kolommen verwijderen
Als u kolommen wilt verwijderen, kunt u kolommen weglaten tijdens een selectie of select(*) except
kunt u de drop
methode gebruiken:
df_customer_flag_renamed.drop("balance_flag_renamed")
U kunt ook meerdere kolommen tegelijk verwijderen:
df_customer_flag_renamed.drop("c_phone", "balance_flag_renamed")
Rijbewerkingen
Spark biedt veel basisrijbewerkingen:
- Rijen filteren
- Dubbele rijen verwijderen
- Null-waarden verwerken
- Rijen toevoegen
- Rijen sorteren
- Rijen filteren
Rijen filteren
Als u rijen wilt filteren, gebruikt u de filter
of where
methode op een DataFrame om alleen bepaalde rijen te retourneren. Als u een kolom wilt identificeren waarop u wilt filteren, gebruikt u de col
methode of een expressie die resulteert in een kolom.
from pyspark.sql.functions import col
df_that_one_customer = df_customer.filter(col("c_custkey") == 412449)
Als u wilt filteren op meerdere voorwaarden, gebruikt u logische operators. U kunt bijvoorbeeld &
|
voorwaarden en OR
voorwaarden instellenAND
. In het volgende voorbeeld worden rijen gefilterd waarbij het c_nationkey
gelijk is aan 20
en c_acctbal
groter is dan 1000
.
df_customer.filter((col("c_nationkey") == 20) & (col("c_acctbal") > 1000))
df_filtered_customer = df_customer.filter((col("c_custkey") == 412446) | (col("c_custkey") == 412447))
Dubbele rijen verwijderen
Als u dubbele rijen wilt verwijderen, gebruikt distinct
u deze functie, die alleen de unieke rijen retourneert.
df_unique = df_customer.distinct()
Null-waarden verwerken
Als u null-waarden wilt verwerken, verwijdert u rijen die null-waarden bevatten met behulp van de na.drop
methode. Met deze methode kunt u opgeven of u rijen any
met null-waarden of all
null-waarden wilt verwijderen.
Als u null-waarden wilt verwijderen, gebruikt u een van de volgende voorbeelden.
df_customer_no_nulls = df_customer.na.drop()
df_customer_no_nulls = df_customer.na.drop("any")
Als u in plaats daarvan alleen rijen wilt filteren die alle null-waarden bevatten, gebruikt u het volgende:
df_customer_no_nulls = df_customer.na.drop("all")
U kunt dit toepassen op een subset kolommen door dit op te geven, zoals hieronder wordt weergegeven:
df_customer_no_nulls = df_customer.na.drop("all", subset=["c_acctbal", "c_custkey"])
Gebruik de fill
methode om ontbrekende waarden in te vullen. U kunt ervoor kiezen om dit toe te passen op alle kolommen of een subset van kolommen. In het onderstaande voorbeeld worden de accountsaldo's met een null-waarde voor hun rekeningsaldo c_acctbal
gevuld 0
.
df_customer_filled = df_customer.na.fill("0", subset=["c_acctbal"])
Gebruik de replace
methode om tekenreeksen te vervangen door andere waarden. In het onderstaande voorbeeld worden lege adrestekenreeksen vervangen door het woord UNKNOWN
:
df_customer_phone_filled = df_customer.na.replace([""], ["UNKNOWN"], subset=["c_phone"])
Rijen toevoegen
Als u rijen wilt toevoegen, moet u de union
methode gebruiken om een nieuw DataFrame te maken. In het volgende voorbeeld wordt het DataFrame df_that_one_customer
dat eerder is gemaakt en df_filtered_customer
gecombineerd, waarmee een DataFrame met drie klanten wordt geretourneerd:
df_appended_rows = df_that_one_customer.union(df_filtered_customer)
display(df_appended_rows)
Notitie
U kunt DataFrames ook combineren door ze naar een tabel te schrijven en vervolgens nieuwe rijen toe te voegen. Voor productieworkloads kan incrementele verwerking van gegevensbronnen naar een doeltabel de latentie en de rekenkosten drastisch verminderen naarmate de gegevens groter worden. Zie Gegevens opnemen in een Databricks Lakehouse.
Rijen sorteren
Belangrijk
Sorteren kan duur zijn op schaal en als u gesorteerde gegevens opslaat en de gegevens opnieuw laadt met Spark, is de volgorde niet gegarandeerd. Zorg ervoor dat u opzettelijk sorteert.
Als u rijen wilt sorteren op een of meer kolommen, gebruikt u de sort
of orderBy
methode. Deze methoden worden standaard in oplopende volgorde gesorteerd:
df_customer.orderBy(col("c_acctbal"))
Als u wilt filteren in aflopende volgorde, gebruikt u desc
:
df_customer.sort(col("c_custkey").desc())
In het volgende voorbeeld ziet u hoe u kunt sorteren op twee kolommen:
df_sorted = df_customer.orderBy(col("c_acctbal").desc(), col("c_custkey").asc())
df_sorted = df_customer.sort(col("c_acctbal").desc(), col("c_custkey").asc())
Gebruik de limit
methode om het aantal rijen te beperken dat moet worden geretourneerd zodra het DataFrame is gesorteerd. In het volgende voorbeeld worden alleen de belangrijkste 10
resultaten weergegeven:
display(df_sorted.limit(10))
DataFrames koppelen
Als u twee of meer DataFrames wilt koppelen, gebruikt u de join
methode. U kunt opgeven hoe u de DataFrames wilt samenvoegen in de how
parameters (het jointype) en on
(op welke kolommen de join moet worden gebaseerd). Veelvoorkomende jointypen zijn:
inner
: Dit is de standaardinstelling van het jointype, die een DataFrame retourneert waarmee alleen de rijen worden bijgehouden waarvoor deon
parameter in de DataFrames overeenkomt.left
: Hiermee blijven alle rijen van het eerste opgegeven DataFrame behouden en alleen rijen uit het tweede opgegeven DataFrame die overeenkomen met de eerste.outer
: Met een outer join worden alle rijen van beide DataFrames bijgehouden, ongeacht de overeenkomst.
Zie Werken met joins in Azure Databricks voor gedetailleerde informatie over joins. Zie DataFrame-joins voor een lijst met joins die worden ondersteund in PySpark.
In het volgende voorbeeld wordt één DataFrame geretourneerd waarbij elke rij van het orders
DataFrame is gekoppeld aan de bijbehorende rij van het customers
DataFrame. Een inner join wordt gebruikt, omdat de verwachting is dat elke bestelling overeenkomt met precies één klant.
df_customer = spark.table('samples.tpch.customer')
df_order = spark.table('samples.tpch.orders')
df_joined = df_order.join(
df_customer,
on = df_order["o_custkey"] == df_customer["c_custkey"],
how = "inner"
)
display(df_joined)
Als u wilt deelnemen aan meerdere voorwaarden, gebruikt u booleaanse operatoren zoals &
en |
om respectievelijk op te geven AND
en OR
, respectievelijk. In het volgende voorbeeld wordt een extra voorwaarde toegevoegd, waarbij alleen wordt gefilterd op de rijen die groter zijn o_totalprice
dan 500,000
:
df_customer = spark.table('samples.tpch.customer')
df_order = spark.table('samples.tpch.orders')
df_complex_joined = df_order.join(
df_customer,
on = ((df_order["o_custkey"] == df_customer["c_custkey"]) & (df_order["o_totalprice"] > 500000)),
how = "inner"
)
display(df_complex_joined)
Geaggregeerde gegevens
Als u gegevens in een DataFrame wilt aggregeren, vergelijkbaar met een GROUP BY
in SQL, gebruikt u de groupBy
methode om kolommen op te geven waarop u wilt groeperen en de agg
methode om aggregaties op te geven. Algemene aggregaties importeren, waaronder avg
, sum
, max
en min
van pyspark.sql.functions
. In het volgende voorbeeld ziet u het gemiddelde klantsaldo per marktsegment:
from pyspark.sql.functions import avg
# group by one column
df_segment_balance = df_customer.groupBy("c_mktsegment").agg(
avg(df_customer["c_acctbal"])
)
display(df_segment_balance)
from pyspark.sql.functions import avg
# group by two columns
df_segment_nation_balance = df_customer.groupBy("c_mktsegment", "c_nationkey").agg(
avg(df_customer["c_acctbal"])
)
display(df_segment_nation_balance)
Sommige aggregaties zijn acties, wat betekent dat ze berekeningen activeren. In dit geval hoeft u geen andere acties te gebruiken om resultaten uit te voeren.
Als u rijen in een DataFrame wilt tellen, gebruikt u de count
methode:
df_customer.count()
Aanroepen koppelen
Methoden waarmee DataFrames worden getransformeerd, retourneren DataFrames en Spark handelt pas op transformaties als er acties worden aangeroepen. Deze luie evaluatie betekent dat u meerdere methoden voor gemak en leesbaarheid kunt koppelen. In het volgende voorbeeld ziet u hoe u filters, aggregatie en volgorde kunt koppelen:
from pyspark.sql.functions import count
df_chained = (
df_order.filter(col("o_orderstatus") == "F")
.groupBy(col("o_orderpriority"))
.agg(count(col("o_orderkey")).alias("n_orders"))
.sort(col("n_orders").desc())
)
display(df_chained)
Uw DataFrame visualiseren
Als u een DataFrame in een notebook wilt visualiseren, klikt u op het + teken naast de tabel linksboven in het DataFrame en selecteert u Visualisatie om een of meer grafieken toe te voegen op basis van uw DataFrame. Zie Visualisaties in Databricks-notebooks voor meer informatie over visualisaties.
display(df_order)
Als u extra visualisaties wilt uitvoeren, raadt Databricks aan om pandas-API voor Spark te gebruiken. Hiermee .pandas_api()
kunt u casten naar de bijbehorende Pandas-API voor een Spark DataFrame. Zie Pandas-API in Spark voor meer informatie.
Uw gegevens opslaan
Zodra u uw gegevens hebt getransformeerd, kunt u deze opslaan met behulp van de DataFrameWriter
methoden. Een volledige lijst met deze methoden vindt u in DataFrameWriter. In de volgende secties ziet u hoe u uw DataFrame opslaat als een tabel en als een verzameling gegevensbestanden.
Uw DataFrame opslaan als een tabel
Als u uw DataFrame wilt opslaan als een tabel in Unity Catalog, gebruikt u de write.saveAsTable
methode en geeft u het pad op in de indeling <catalog-name>.<schema-name>.<table-name>
.
df_joined.write.saveAsTable(f"{catalog_name}.{schema_name}.{table_name}")
Uw DataFrame schrijven als CSV
Als u uw DataFrame wilt schrijven om deze op te *.csv
maken, gebruikt u de write.csv
methode en geeft u de indeling en opties op. Standaard als er gegevens aanwezig zijn op het opgegeven pad, mislukt de schrijfbewerking. U kunt een van de volgende modi opgeven om een andere actie uit te voeren:
overwrite
Overschrijft alle bestaande gegevens in het doelpad met de inhoud van het DataFrame.append
voegt inhoud van het DataFrame toe aan gegevens in het doelpad.ignore
de schrijfbewerking op de achtergrond mislukt als er gegevens aanwezig zijn in het doelpad.
In het volgende voorbeeld ziet u hoe u gegevens overschrijft met DataFrame-inhoud als CSV-bestanden:
# Assign this variable your file path
file_path = ""
(df_joined.write
.format("csv")
.mode("overwrite")
.write(file_path)
)
Volgende stappen
Als u meer Spark-mogelijkheden in Databricks wilt gebruiken, raadpleegt u: