Delen via


Basisbeginselen van PySpark

In dit artikel worden eenvoudige voorbeelden beschreven om het gebruik van PySpark te illustreren. Hierbij wordt ervan uitgegaan dat u de basisconcepten van Apache Spark begrijpt en opdrachten uitvoert in een Azure Databricks-notebook dat is verbonden met compute. U maakt DataFrames met voorbeeldgegevens, voert basistransformaties uit, inclusief rij- en kolombewerkingen op deze gegevens, combineert meerdere DataFrames en aggregeert deze gegevens, visualiseert deze gegevens en slaat deze vervolgens op in een tabel of bestand.

Gegevens uploaden

In sommige voorbeelden in dit artikel worden voorbeeldgegevens van Databricks gebruikt om te laten zien hoe u DataFrames gebruikt om gegevens te laden, transformeren en opslaan. Als u uw eigen gegevens wilt gebruiken die nog niet in Databricks aanwezig zijn, kunt u deze eerst uploaden en er een DataFrame van maken. Zie Een tabel maken of wijzigen met behulp van het uploaden en uploaden van bestanden naar een Unity Catalog-volume.

Over Databricks-voorbeeldgegevens

Databricks biedt voorbeeldgegevens in de samples catalogus en in de /databricks-datasets map.

  • Gebruik de indeling samples.<schema-name>.<table-name>om toegang te krijgen tot de voorbeeldgegevens in de samples catalogus. In dit artikel worden tabellen in het samples.tpch schema gebruikt die gegevens uit een fictief bedrijf bevatten. De customer tabel bevat informatie over klanten en orders bevat informatie over orders die door deze klanten zijn geplaatst.
  • Gebruiken dbutils.fs.ls om gegevens in /databricks-datasetste verkennen. Gebruik Spark SQL of DataFrames om gegevens op deze locatie op te vragen met behulp van bestandspaden. Zie Voorbeeldgegevenssets voor meer informatie over door Databricks geleverde voorbeeldgegevens.

Gegevenstypen importeren

Voor veel PySpark-bewerkingen moet u SQL-functies gebruiken of communiceren met systeemeigen Spark-typen. U kunt alleen de functies en typen die u nodig hebt rechtstreeks importeren, of u kunt de hele module importeren.

# import all
from pyspark.sql.types import *
from pyspark.sql.functions import *

# import select functions and types
from pyspark.sql.types import IntegerType, StringType
from pyspark.sql.functions import floor, round

Omdat sommige geïmporteerde functies mogelijk ingebouwde Python-functies overschrijven, kiezen sommige gebruikers ervoor om deze modules te importeren met behulp van een alias. In de volgende voorbeelden ziet u een algemene alias die wordt gebruikt in Apache Spark-codevoorbeelden:

import pyspark.sql.types as T
import pyspark.sql.functions as F

Zie Spark-gegevenstypen voor een uitgebreide lijst met gegevenstypen.

Zie Spark Functions voor een uitgebreide lijst met PySpark SQL-functies.

Een DataFrame maken

Er zijn verschillende manieren om een DataFrame te maken. Meestal definieert u een DataFrame op basis van een gegevensbron, zoals een tabel of verzameling bestanden. Gebruik vervolgens, zoals beschreven in de sectie basisconcepten van Apache Spark, een actie, zoals display, om de transformaties te activeren die moeten worden uitgevoerd. De display methode voert DataFrames uit.

Een DataFrame met opgegeven waarden maken

Als u een DataFrame met opgegeven waarden wilt maken, gebruikt u de createDataFrame methode, waarbij rijen worden uitgedrukt als een lijst met tuples:

df_children = spark.createDataFrame(
  data = [("Mikhail", 15), ("Zaky", 13), ("Zoya", 8)],
  schema = ['name', 'age'])
display(df_children)

U ziet in de uitvoer dat de gegevenstypen van kolommen df_children automatisch worden afgeleid. U kunt de typen ook opgeven door een schema toe te voegen. Schema's worden gedefinieerd met behulp van StructFields de StructType opgegeven naam, het gegevenstype en een booleaanse vlag die aangeeft of ze een null-waarde bevatten of niet. U moet gegevenstypen importeren uit pyspark.sql.types.

from pyspark.sql.types import StructType, StructField, StringType, IntegerType

df_children_with_schema = spark.createDataFrame(
  data = [("Mikhail", 15), ("Zaky", 13), ("Zoya", 8)],
  schema = StructType([
    StructField('name', StringType(), True),
    StructField('age', IntegerType(), True)
  ])
)
display(df_children_with_schema)

Een DataFrame maken op basis van een tabel in Unity Catalog

Als u een DataFrame wilt maken op basis van een tabel in Unity Catalog, gebruikt u de table methode voor het identificeren van de tabel met de indeling <catalog-name>.<schema-name>.<table-name>. Klik op Catalogus op de linkernavigatiebalk om Catalogusverkenner te gebruiken om naar uw tabel te navigeren. Klik erop en selecteer Vervolgens het pad naar de tabel kopiëren om het tabelpad in het notitieblok in te voegen.

In het volgende voorbeeld wordt de tabel samples.tpch.customergeladen, maar u kunt ook het pad naar uw eigen tabel opgeven.

df_customer = spark.table('samples.tpch.customer')
display(df_customer)

Een DataFrame maken op basis van een geüpload bestand

Als u een DataFrame wilt maken van een bestand dat u hebt geüpload naar Unity Catalog-volumes, gebruikt u de read eigenschap. Deze methode retourneert een DataFrameReader, die u vervolgens kunt gebruiken om de juiste indeling te lezen. Klik op de catalogusoptie op de kleine zijbalk aan de linkerkant en gebruik de catalogusbrowser om uw bestand te zoeken. Selecteer deze en klik vervolgens op Pad naar volumebestand kopiëren.

Het onderstaande voorbeeld leest uit een *.csv bestand, maar DataFrameReader ondersteunt het uploaden van bestanden in veel andere indelingen. Zie DataFrameReader-methoden.

# Assign this variable your full volume file path
volume_file_path = ""

df_csv = (spark.read
  .format("csv")
  .option("header", True)
  .option("inferSchema", True)
  .load(volume_file_path)
)
display(df_csv)

Zie Wat zijn Unity Catalog-volumes? voor meer informatie over Unity Catalog-volumes.

Een DataFrame maken op basis van een JSON-antwoord

Als u een DataFrame wilt maken op basis van een nettolading van een JSON-antwoord die wordt geretourneerd door een REST API, gebruikt u het Python-pakket requests om een query uit te voeren en het antwoord te parseren. U moet het pakket importeren om het te gebruiken. In dit voorbeeld worden gegevens uit de database van de Verenigde Staten Food and Drug Administration-toepassingstoepassing gebruikt.

import requests

# Download data from URL
url = "https://api.fda.gov/drug/drugsfda.json?limit=100"
response = requests.get(url)

# Create the DataFrame
df_drugs = spark.createDataFrame(response.json()["results"])
display(df_drugs)

Zie Semi-gestructureerde modelgegevens voor informatie over het werken met JSON en andere semi-gestructureerde gegevens in Databricks.

Een JSON-veld of -object selecteren

Als u een specifiek veld of object wilt selecteren in de geconverteerde JSON, gebruikt u de [] notatie. Als u bijvoorbeeld het products veld wilt selecteren dat zelf een matrix met producten is:

display(df_drugs.select(df_drugs["products"]))

U kunt ook methode-aanroepen koppelen om meerdere velden te doorlopen. Als u bijvoorbeeld de merknaam van het eerste product in een drugtoepassing wilt uitvoeren:

display(df_drugs.select(df_drugs["products"][0]["brand_name"]))

Een DataFrame maken op basis van een bestand

Als u wilt laten zien hoe u een DataFrame maakt op basis van een bestand, worden in dit voorbeeld CSV-gegevens in de /databricks-datasets map geladen.

Als u naar de voorbeeldgegevenssets wilt navigeren, kunt u de databricks Utilties-bestandssysteemopdrachten gebruiken. In het volgende voorbeeld worden dbutils de gegevenssets vermeld die beschikbaar zijn in /databricks-datasets:

display(dbutils.fs.ls('/databricks-datasets'))

U kunt ook toegang krijgen %fs tot databricks CLI-bestandssysteemopdrachten, zoals wordt weergegeven in het volgende voorbeeld:

%fs ls '/databricks-datasets'

Als u een DataFrame wilt maken op basis van een bestand of map met bestanden, geeft u het pad op in de load methode:

df_population = (spark.read
  .format("csv")
  .option("header", True)
  .option("inferSchema", True)
  .load("/databricks-datasets/samples/population-vs-price/data_geo.csv")
)
display(df_population)

Gegevens transformeren met DataFrames

Met DataFrames kunt u eenvoudig gegevens transformeren met behulp van ingebouwde methoden om gegevens te sorteren, filteren en aggregeren. Veel transformaties worden niet opgegeven als methoden in DataFrames, maar worden in plaats daarvan geleverd in het spark.sql.functions pakket. Zie Databricks Spark SQL Functions.

Kolombewerkingen

Spark biedt veel basiskolombewerkingen:

Tip

Als u alle kolommen in een DataFrame wilt uitvoeren, gebruikt columnsu bijvoorbeeld df_customer.columns.

Kolommen selecteren

U kunt specifieke kolommen selecteren met en select col. De col functie bevindt zich in de pyspark.sql.functions submodule.

from pyspark.sql.functions import col

df_customer.select(
  col("c_custkey"),
  col("c_acctbal")
)

U kunt ook verwijzen naar een kolom die expr een expressie gebruikt die is gedefinieerd als een tekenreeks:

from pyspark.sql.functions import expr

df_customer.select(
  expr("c_custkey"),
  expr("c_acctbal")
)

U kunt ook SQL-expressies gebruiken selectExpr:

df_customer.selectExpr(
  "c_custkey as key",
  "round(c_acctbal) as account_rounded"
)

Ga als volgt te werk om kolommen te selecteren met een letterlijke tekenreeks:

df_customer.select(
  "c_custkey",
  "c_acctbal"
)

Als u expliciet een kolom uit een specifiek DataFrame wilt selecteren, kunt u de [] operator of de . operator gebruiken. (De . operator kan niet worden gebruikt om kolommen te selecteren die beginnen met een geheel getal of kolommen die een spatie of speciaal teken bevatten.) Dit kan met name handig zijn wanneer u dataframes koppelt waarbij sommige kolommen dezelfde naam hebben.

df_customer.select(
  df_customer["c_custkey"],
  df_customer["c_acctbal"]
)
df_customer.select(
  df_customer.c_custkey,
  df_customer.c_acctbal
)

Kolommen maken

Als u een nieuwe kolom wilt maken, gebruikt u de withColumn methode. In het volgende voorbeeld wordt een nieuwe kolom gemaakt die een Booleaanse waarde bevat op basis van het feit of het saldo c_acctbal van de klantrekening groter is 1000dan:

df_customer_flag = df_customer.withColumn("balance_flag", col("c_acctbal") > 1000)

Namen van kolommen wijzigen

Als u de naam van een kolom wilt wijzigen, gebruikt u de withColumnRenamed methode die de bestaande en nieuwe kolomnamen accepteert:

df_customer_flag_renamed = df_customer_flag.withColumnRenamed("balance_flag", "balance_flag_renamed")

De alias methode is vooral handig als u de naam van uw kolommen wilt wijzigen als onderdeel van aggregaties:

from pyspark.sql.functions import avg

df_segment_balance = df_customer.groupBy("c_mktsegment").agg(
    avg(df_customer["c_acctbal"]).alias("avg_account_balance")
)

display(df_segment_balance)

Cast-kolomtypen

In sommige gevallen kunt u het gegevenstype voor een of meer kolommen in uw DataFrame wijzigen. Hiervoor gebruikt u de cast methode om te converteren tussen kolomgegevenstypen. In het volgende voorbeeld ziet u hoe u een kolom converteert van een geheel getal naar een tekenreekstype, met behulp van de col methode om te verwijzen naar een kolom:

from pyspark.sql.functions import col

df_casted = df_customer.withColumn("c_custkey", col("c_custkey").cast(StringType()))
print(type(df_casted))

Kolommen verwijderen

Als u kolommen wilt verwijderen, kunt u kolommen weglaten tijdens een selectie of select(*) except kunt u de drop methode gebruiken:

df_customer_flag_renamed.drop("balance_flag_renamed")

U kunt ook meerdere kolommen tegelijk verwijderen:

df_customer_flag_renamed.drop("c_phone", "balance_flag_renamed")

Rijbewerkingen

Spark biedt veel basisrijbewerkingen:

Rijen filteren

Als u rijen wilt filteren, gebruikt u de filter of where methode op een DataFrame om alleen bepaalde rijen te retourneren. Als u een kolom wilt identificeren waarop u wilt filteren, gebruikt u de col methode of een expressie die resulteert in een kolom.

from pyspark.sql.functions import col

df_that_one_customer = df_customer.filter(col("c_custkey") == 412449)

Als u wilt filteren op meerdere voorwaarden, gebruikt u logische operators. U kunt bijvoorbeeld & | voorwaarden en OR voorwaarden instellenAND. In het volgende voorbeeld worden rijen gefilterd waarbij het c_nationkey gelijk is aan 20 en c_acctbal groter is dan 1000.

df_customer.filter((col("c_nationkey") == 20) & (col("c_acctbal") > 1000))
df_filtered_customer = df_customer.filter((col("c_custkey") == 412446) | (col("c_custkey") == 412447))

Dubbele rijen verwijderen

Als u dubbele rijen wilt verwijderen, gebruikt distinctu deze functie, die alleen de unieke rijen retourneert.

df_unique = df_customer.distinct()

Null-waarden verwerken

Als u null-waarden wilt verwerken, verwijdert u rijen die null-waarden bevatten met behulp van de na.drop methode. Met deze methode kunt u opgeven of u rijen any met null-waarden of all null-waarden wilt verwijderen.

Als u null-waarden wilt verwijderen, gebruikt u een van de volgende voorbeelden.

df_customer_no_nulls = df_customer.na.drop()
df_customer_no_nulls = df_customer.na.drop("any")

Als u in plaats daarvan alleen rijen wilt filteren die alle null-waarden bevatten, gebruikt u het volgende:

df_customer_no_nulls = df_customer.na.drop("all")

U kunt dit toepassen op een subset kolommen door dit op te geven, zoals hieronder wordt weergegeven:

df_customer_no_nulls = df_customer.na.drop("all", subset=["c_acctbal", "c_custkey"])

Gebruik de fill methode om ontbrekende waarden in te vullen. U kunt ervoor kiezen om dit toe te passen op alle kolommen of een subset van kolommen. In het onderstaande voorbeeld worden de accountsaldo's met een null-waarde voor hun rekeningsaldo c_acctbal gevuld 0.

df_customer_filled = df_customer.na.fill("0", subset=["c_acctbal"])

Gebruik de replace methode om tekenreeksen te vervangen door andere waarden. In het onderstaande voorbeeld worden lege adrestekenreeksen vervangen door het woord UNKNOWN:

df_customer_phone_filled = df_customer.na.replace([""], ["UNKNOWN"], subset=["c_phone"])

Rijen toevoegen

Als u rijen wilt toevoegen, moet u de union methode gebruiken om een nieuw DataFrame te maken. In het volgende voorbeeld wordt het DataFrame df_that_one_customer dat eerder is gemaakt en df_filtered_customer gecombineerd, waarmee een DataFrame met drie klanten wordt geretourneerd:

df_appended_rows = df_that_one_customer.union(df_filtered_customer)

display(df_appended_rows)

Notitie

U kunt DataFrames ook combineren door ze naar een tabel te schrijven en vervolgens nieuwe rijen toe te voegen. Voor productieworkloads kan incrementele verwerking van gegevensbronnen naar een doeltabel de latentie en de rekenkosten drastisch verminderen naarmate de gegevens groter worden. Zie Gegevens opnemen in een Databricks Lakehouse.

Rijen sorteren

Belangrijk

Sorteren kan duur zijn op schaal en als u gesorteerde gegevens opslaat en de gegevens opnieuw laadt met Spark, is de volgorde niet gegarandeerd. Zorg ervoor dat u opzettelijk sorteert.

Als u rijen wilt sorteren op een of meer kolommen, gebruikt u de sort of orderBy methode. Deze methoden worden standaard in oplopende volgorde gesorteerd:

df_customer.orderBy(col("c_acctbal"))

Als u wilt filteren in aflopende volgorde, gebruikt u desc:

df_customer.sort(col("c_custkey").desc())

In het volgende voorbeeld ziet u hoe u kunt sorteren op twee kolommen:

df_sorted = df_customer.orderBy(col("c_acctbal").desc(), col("c_custkey").asc())
df_sorted = df_customer.sort(col("c_acctbal").desc(), col("c_custkey").asc())

Gebruik de limit methode om het aantal rijen te beperken dat moet worden geretourneerd zodra het DataFrame is gesorteerd. In het volgende voorbeeld worden alleen de belangrijkste 10 resultaten weergegeven:

display(df_sorted.limit(10))

DataFrames koppelen

Als u twee of meer DataFrames wilt koppelen, gebruikt u de join methode. U kunt opgeven hoe u de DataFrames wilt samenvoegen in de how parameters (het jointype) en on (op welke kolommen de join moet worden gebaseerd). Veelvoorkomende jointypen zijn:

  • inner: Dit is de standaardinstelling van het jointype, die een DataFrame retourneert waarmee alleen de rijen worden bijgehouden waarvoor de on parameter in de DataFrames overeenkomt.
  • left: Hiermee blijven alle rijen van het eerste opgegeven DataFrame behouden en alleen rijen uit het tweede opgegeven DataFrame die overeenkomen met de eerste.
  • outer: Met een outer join worden alle rijen van beide DataFrames bijgehouden, ongeacht de overeenkomst.

Zie Werken met joins in Azure Databricks voor gedetailleerde informatie over joins. Zie DataFrame-joins voor een lijst met joins die worden ondersteund in PySpark.

In het volgende voorbeeld wordt één DataFrame geretourneerd waarbij elke rij van het orders DataFrame is gekoppeld aan de bijbehorende rij van het customers DataFrame. Een inner join wordt gebruikt, omdat de verwachting is dat elke bestelling overeenkomt met precies één klant.

df_customer = spark.table('samples.tpch.customer')
df_order = spark.table('samples.tpch.orders')

df_joined = df_order.join(
  df_customer,
  on = df_order["o_custkey"] == df_customer["c_custkey"],
  how = "inner"
)

display(df_joined)

Als u wilt deelnemen aan meerdere voorwaarden, gebruikt u booleaanse operatoren zoals & en | om respectievelijk op te geven AND en OR, respectievelijk. In het volgende voorbeeld wordt een extra voorwaarde toegevoegd, waarbij alleen wordt gefilterd op de rijen die groter zijn o_totalprice dan 500,000:

df_customer = spark.table('samples.tpch.customer')
df_order = spark.table('samples.tpch.orders')

df_complex_joined = df_order.join(
  df_customer,
  on = ((df_order["o_custkey"] == df_customer["c_custkey"]) & (df_order["o_totalprice"] > 500000)),
  how = "inner"
)

display(df_complex_joined)

Geaggregeerde gegevens

Als u gegevens in een DataFrame wilt aggregeren, vergelijkbaar met een GROUP BY in SQL, gebruikt u de groupBy methode om kolommen op te geven waarop u wilt groeperen en de agg methode om aggregaties op te geven. Algemene aggregaties importeren, waaronder avg, sum, maxen min van pyspark.sql.functions. In het volgende voorbeeld ziet u het gemiddelde klantsaldo per marktsegment:

from pyspark.sql.functions import avg

# group by one column
df_segment_balance = df_customer.groupBy("c_mktsegment").agg(
    avg(df_customer["c_acctbal"])
)

display(df_segment_balance)
from pyspark.sql.functions import avg

# group by two columns
df_segment_nation_balance = df_customer.groupBy("c_mktsegment", "c_nationkey").agg(
    avg(df_customer["c_acctbal"])
)

display(df_segment_nation_balance)

Sommige aggregaties zijn acties, wat betekent dat ze berekeningen activeren. In dit geval hoeft u geen andere acties te gebruiken om resultaten uit te voeren.

Als u rijen in een DataFrame wilt tellen, gebruikt u de count methode:

df_customer.count()

Aanroepen koppelen

Methoden waarmee DataFrames worden getransformeerd, retourneren DataFrames en Spark handelt pas op transformaties als er acties worden aangeroepen. Deze luie evaluatie betekent dat u meerdere methoden voor gemak en leesbaarheid kunt koppelen. In het volgende voorbeeld ziet u hoe u filters, aggregatie en volgorde kunt koppelen:

from pyspark.sql.functions import count

df_chained = (
    df_order.filter(col("o_orderstatus") == "F")
    .groupBy(col("o_orderpriority"))
    .agg(count(col("o_orderkey")).alias("n_orders"))
    .sort(col("n_orders").desc())
)

display(df_chained)

Uw DataFrame visualiseren

Als u een DataFrame in een notebook wilt visualiseren, klikt u op het + teken naast de tabel linksboven in het DataFrame en selecteert u Visualisatie om een of meer grafieken toe te voegen op basis van uw DataFrame. Zie Visualisaties in Databricks-notebooks voor meer informatie over visualisaties.

display(df_order)

Als u extra visualisaties wilt uitvoeren, raadt Databricks aan om pandas-API voor Spark te gebruiken. Hiermee .pandas_api() kunt u casten naar de bijbehorende Pandas-API voor een Spark DataFrame. Zie Pandas-API in Spark voor meer informatie.

Uw gegevens opslaan

Zodra u uw gegevens hebt getransformeerd, kunt u deze opslaan met behulp van de DataFrameWriter methoden. Een volledige lijst met deze methoden vindt u in DataFrameWriter. In de volgende secties ziet u hoe u uw DataFrame opslaat als een tabel en als een verzameling gegevensbestanden.

Uw DataFrame opslaan als een tabel

Als u uw DataFrame wilt opslaan als een tabel in Unity Catalog, gebruikt u de write.saveAsTable methode en geeft u het pad op in de indeling <catalog-name>.<schema-name>.<table-name>.

df_joined.write.saveAsTable(f"{catalog_name}.{schema_name}.{table_name}")

Uw DataFrame schrijven als CSV

Als u uw DataFrame wilt schrijven om deze op te *.csv maken, gebruikt u de write.csv methode en geeft u de indeling en opties op. Standaard als er gegevens aanwezig zijn op het opgegeven pad, mislukt de schrijfbewerking. U kunt een van de volgende modi opgeven om een andere actie uit te voeren:

  • overwrite Overschrijft alle bestaande gegevens in het doelpad met de inhoud van het DataFrame.
  • append voegt inhoud van het DataFrame toe aan gegevens in het doelpad.
  • ignore de schrijfbewerking op de achtergrond mislukt als er gegevens aanwezig zijn in het doelpad.

In het volgende voorbeeld ziet u hoe u gegevens overschrijft met DataFrame-inhoud als CSV-bestanden:

# Assign this variable your file path
file_path = ""

(df_joined.write
  .format("csv")
  .mode("overwrite")
  .write(file_path)
)

Volgende stappen

Als u meer Spark-mogelijkheden in Databricks wilt gebruiken, raadpleegt u: