Zelfstudie: Gegevens laden en transformeren met Apache Spark DataFrames
In deze zelfstudie leert u hoe u gegevens laadt en transformeert met behulp van de DataFrame-API van Apache Spark (PySpark), de Apache Spark Scala DataFrame-API en de SparkR SparkDataFrame-API in Azure Databricks.
Aan het einde van deze zelfstudie begrijpt u wat een DataFrame is en vertrouwd bent met de volgende taken:
Python
- Variabelen definiëren en openbare gegevens kopiëren naar een Unity-Catalog volume
- Een DataFrame maken met Python
- Gegevens laden in een DataFrame vanuit een CSV-bestand
- Een DataFrame weergeven en ermee werken
- Het DataFrame opslaan
- SQL-query's uitvoeren in PySpark
Zie ook naslaginformatie over de Apache Spark PySpark-API.
Scala
- Variabelen definiëren en openbare gegevens kopiëren naar een Unity-Catalog volume
- Een DataFrame maken met Scala
- Gegevens laden in een DataFrame vanuit een CSV-bestand
- Een DataFrame weergeven en ermee werken
- Het DataFrame opslaan
- SQL-query's uitvoeren in Apache Spark
Zie ook de Naslaginformatie over de Scala-API van Apache Spark.
R
- Variabelen definiëren en openbare gegevens kopiëren naar een Unity-Catalog volume
- Een SparkR SparkDataFrames maken
- Gegevens laden in een DataFrame vanuit een CSV-bestand
- Een DataFrame weergeven en ermee werken
- Het DataFrame opslaan
- SQL-query's uitvoeren in SparkR
Zie ook naslaginformatie over de Apache SparkR-API.
Wat is een DataFrame?
Een DataFrame is een tweedimensionale gelabelde gegevensstructuur met columns van mogelijk verschillende typen. U kunt een DataFrame beschouwen als een spreadsheet, een SQL-tableof een woordenlijst met reeksobjecten. Apache Spark DataFrames bieden een uitgebreide set van functies (selectcolumns, filter, join, aggregaties) waarmee u veelvoorkomende problemen met gegevensanalyse efficiënt kunt oplossen.
Apache Spark DataFrames zijn een abstractie die is gebouwd op RDD's (Resilient Distributed Datasets). Spark DataFrames en Spark SQL maken gebruik van een geïntegreerde plannings- en optimalisatie-engine, zodat u bijna identieke prestaties kunt get in alle ondersteunde talen in Azure Databricks (Python, SQL, Scala en R).
Vereisten
Als u de volgende zelfstudie wilt voltooien, moet u voldoen aan de volgende vereisten:
Als u de voorbeelden in deze handleiding wilt gebruiken, moet uw werkruimte Unity Catalog ingeschakeld zijn.
In de voorbeelden in deze zelfstudie wordt een Unity-Catalog-volume gebruikt om voorbeeldgegevens op te slaan. Als u deze voorbeelden wilt gebruiken, maakt u een volume en gebruikt u de catalogvan dat volume, schemaen volumenamen om het volumepad te set dat door de voorbeelden wordt gebruikt.
U moet de volgende machtigingen hebben in Unity Catalog:
-
READ VOLUME
enWRITE VOLUME
, ofALL PRIVILEGES
voor het volume dat voor deze zelfstudie wordt gebruikt. -
USE SCHEMA
ofALL PRIVILEGES
voor de schema die voor deze tutorial wordt gebruikt. -
USE CATALOG
ofALL PRIVILEGES
voor de catalog die voor deze instructie wordt gebruikt.
Als u deze machtigingen wilt set, raadpleegt u uw Databricks-beheerder of Unity-Catalog-bevoegdheden en beveiligbare objecten.
-
Tip
Zie DataFrame-zelfstudienotebooks voor een voltooid notebook voor dit artikel.
Stap 1: Variabelen definiëren en CSV-bestand laden
Deze stap definieert variabelen voor gebruik in deze zelfstudie en laadt vervolgens een CSV-bestand met babynaamgegevens van health.data.ny.gov in uw Unity Catalog volume.
Open een nieuw notitieblok door op het pictogram te klikken. Zie de Interface en besturingselementen voor Databricks-notebooks voor meer informatie over het navigeren in Azure Databricks-notebooks.
Kopieer en plak de volgende code in de nieuwe lege notebookcel. Vervang
<catalog-name>
,<schema-name>
en<volume-name>
door de catalog, schemaen volumenamen voor een Unity-Catalog volume. Vervang<table_name>
door een table naam van uw keuze. Later in deze handleiding zult u gegevens van babynamen in deze table laden.Python
catalog = "<catalog_name>" schema = "<schema_name>" volume = "<volume_name>" download_url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv" file_name = "rows.csv" table_name = "<table_name>" path_volume = "/Volumes/" + catalog + "/" + schema + "/" + volume path_table = catalog + "." + schema print(path_table) # Show the complete path print(path_volume) # Show the complete path
Scala
val catalog = "<catalog_name>" val schema = "<schema_name>" val volume = "<volume_name>" val downloadUrl = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv" val fileName = "rows.csv" val tableName = "<table_name>" val pathVolume = s"/Volumes/$catalog/$schema/$volume" val pathTable = s"$catalog.$schema" print(pathVolume) // Show the complete path print(pathTable) // Show the complete path
R
catalog <- "<catalog_name>" schema <- "<schema_name>" volume <- "<volume_name>" download_url <- "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv" file_name <- "rows.csv" table_name <- "<table_name>" path_volume <- paste("/Volumes/", catalog, "/", schema, "/", volume, sep = "") path_table <- paste(catalog, ".", schema, sep = "") print(path_volume) # Show the complete path print(path_table) # Show the complete path
Druk
Shift+Enter
om de cel uit te voeren en een nieuwe lege cel te maken.Kopieer en plak de volgende code in de nieuwe lege notebookcel. Met deze code kopieert u het
rows.csv
-bestand van health.data.ny.gov naar uw Unity Catalog volume met behulp van de opdracht Databricks dbutuils.Python
dbutils.fs.cp(f"{download_url}", f"{path_volume}/{file_name}")
Scala
dbutils.fs.cp(downloadUrl, s"$pathVolume/$fileName")
R
dbutils.fs.cp(download_url, paste(path_volume, "/", file_name, sep = ""))
Druk
Shift+Enter
om de cel uit te voeren en naar de volgende cel te gaan.
Stap 2: Een DataFrame maken
Met deze stap maakt u een DataFrame met de naam df1
testgegevens en wordt vervolgens de inhoud ervan weergegeven.
Kopieer en plak de volgende code in de nieuwe lege notebookcel. Met deze code maakt u het DataFrame met testgegevens en geeft u vervolgens de inhoud en de schema van het DataFrame weer.
Python
data = [[2021, "test", "Albany", "M", 42]] columns = ["Year", "First_Name", "County", "Sex", "Count"] df1 = spark.createDataFrame(data, schema="Year int, First_Name STRING, County STRING, Sex STRING, Count int") display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization. # df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.
Scala
val data = Seq((2021, "test", "Albany", "M", 42)) val columns = Seq("Year", "First_Name", "County", "Sex", "Count") val df1 = data.toDF(columns: _*) display(df1) // The display() method is specific to Databricks notebooks and provides a richer visualization. // df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.
R
# Load the SparkR package that is already preinstalled on the cluster. library(SparkR) data <- data.frame( Year = as.integer(c(2021)), First_Name = c("test"), County = c("Albany"), Sex = c("M"), Count = as.integer(c(42)) ) df1 <- createDataFrame(data) display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization. # head(df1) The head() method is a part of the Apache SparkR DataFrame API and provides basic visualization.
Druk
Shift+Enter
om de cel uit te voeren en naar de volgende cel te gaan.
Stap 3: Gegevens in een DataFrame laden vanuit een CSV-bestand
Met deze stap maakt u een DataFrame met de naam df_csv
van het CSV-bestand dat u eerder in uw Unity Catalog volume hebt geladen. Zie spark.read.csv.
Kopieer en plak de volgende code in de nieuwe lege notebookcel. Met deze code worden babynaamgegevens vanuit het CSV-bestand in DataFrame
df_csv
geladen en wordt vervolgens de inhoud van het DataFrame weergegeven.Python
df_csv = spark.read.csv(f"{path_volume}/{file_name}", header=True, inferSchema=True, sep=",") display(df_csv)
Scala
val dfCsv = spark.read .option("header", "true") .option("inferSchema", "true") .option("delimiter", ",") .csv(s"$pathVolume/$fileName") display(dfCsv)
R
df_csv <- read.df(paste(path_volume, "/", file_name, sep=""), source="csv", header = TRUE, inferSchema = TRUE, delimiter = ",") display(df_csv)
Druk
Shift+Enter
om de cel uit te voeren en naar de volgende cel te gaan.
U kunt gegevens laden uit een groot aantal ondersteunde bestandsindelingen.
Stap 4: Uw DataFrame weergeven en ermee werken
Bekijk en communiceer met uw babynamen DataFrames met behulp van de volgende methoden.
De DataFrame-schema afdrukken
Meer informatie over het weergeven van de schema van een Apache Spark DataFrame. Apache Spark gebruikt de term schema om te verwijzen naar de namen en gegevenstypen van de columns in het DataFrame.
Notitie
Azure Databricks gebruikt ook de term schema om een verzameling tables te beschrijven die is geregistreerd bij een catalog.
Kopieer en plak de volgende code in een lege notebookcel. Deze code toont de schema van uw DataFrames met de
.printSchema()
-methode om de schema's van de twee DataFrames weer te geven en voor te bereiden om de twee DataFrames te combineren.Python
df_csv.printSchema() df1.printSchema()
Scala
dfCsv.printSchema() df1.printSchema()
R
printSchema(df_csv) printSchema(df1)
Druk
Shift+Enter
om de cel uit te voeren en naar de volgende cel te gaan.
Naam van column wijzigen in het DataFrame
Meer informatie over het wijzigen van de naam van een column in een DataFrame.
Kopieer en plak de volgende code in een lege notebookcel. Met deze code wordt de naam van een column in het
df1_csv
DataFrame aangepast aan de respectieve column in hetdf1
DataFrame. Deze code maakt gebruik van de Apache Spark-methodewithColumnRenamed()
.Python
df_csv = df_csv.withColumnRenamed("First Name", "First_Name") df_csv.printSchema
Scala
val dfCsvRenamed = dfCsv.withColumnRenamed("First Name", "First_Name") // when modifying a DataFrame in Scala, you must assign it to a new variable dfCsvRenamed.printSchema()
R
df_csv <- withColumnRenamed(df_csv, "First Name", "First_Name") printSchema(df_csv)
Druk
Shift+Enter
om de cel uit te voeren en naar de volgende cel te gaan.
DataFrames combineren
Leer hoe u een nieuw DataFrame maakt waarmee de rijen van het ene DataFrame aan het andere worden toegevoegd.
Kopieer en plak de volgende code in een lege notebookcel. Deze code maakt gebruik van de Apache Spark-methode
union()
om de inhoud van uw eerste DataFramedf
te combineren met DataFramedf_csv
met de babynamen die zijn geladen vanuit het CSV-bestand.Python
df = df1.union(df_csv) display(df)
Scala
val df = df1.union(dfCsvRenamed) display(df)
R
display(df <- union(df1, df_csv))
Druk
Shift+Enter
om de cel uit te voeren en naar de volgende cel te gaan.
Rijen filteren in een DataFrame
Ontdek de populairste babynamen in uw gegevens set door rijen te filteren met behulp van de Apache Spark-.filter()
of .where()
methoden. Filter gebruiken om een subset van rijen te select om een DataFrame terug te geven of te wijzigen. Er is geen verschil in prestaties of syntaxis, zoals te zien is in de volgende voorbeelden.
Methode .filter() gebruiken
Kopieer en plak de volgende code in een lege notebookcel. Deze code maakt gebruik van de Apache Spark-methode
.filter()
om deze rijen in het DataFrame weer te geven met een telling van meer dan 50.Python
display(df.filter(df["Count"] > 50))
Scala
display(df.filter(df("Count") > 50))
R
display(filteredDF <- filter(df, df$Count > 50))
Druk
Shift+Enter
om de cel uit te voeren en naar de volgende cel te gaan.
Gebruik de methodewhere()
Kopieer en plak de volgende code in een lege notebookcel. Deze code maakt gebruik van de Apache Spark-methode
.where()
om deze rijen in het DataFrame weer te geven met een telling van meer dan 50.Python
display(df.where(df["Count"] > 50))
Scala
display(df.where(df("Count") > 50))
R
display(filtered_df <- where(df, df$Count > 50))
Druk
Shift+Enter
om de cel uit te voeren en naar de volgende cel te gaan.
Select columns van een DataFrame en volgorde per frequentie
Ontdek welke babynaamfrequentie met de select()
-methode moet worden opgegeven om de columns van het DataFrame te retourneren. Gebruik Apache Spark orderby
en desc
functies om de resultaten te ordenen.
De pyspark.sql-module voor Apache Spark biedt ondersteuning voor SQL-functies. Een van deze functies die we in deze zelfstudie gebruiken, zijn de Apache Spark orderBy()
en desc()
expr()
functies. U schakelt het gebruik van deze functies in door ze indien nodig in uw sessie te importeren.
Kopieer en plak de volgende code in een lege notebookcel. Met deze code wordt de
desc()
functie geïmporteerd en vervolgens de Apache Spark-methodeselect()
en Apache SparkorderBy()
endesc()
functies gebruikt om de meest voorkomende namen en hun aantallen in aflopende volgorde weer te geven.Python
from pyspark.sql.functions import desc display(df.select("First_Name", "Count").orderBy(desc("Count")))
Scala
import org.apache.spark.sql.functions.desc display(df.select("First_Name", "Count").orderBy(desc("Count")))
R
display(arrange(select(df, df$First_Name, df$Count), desc(df$Count)))
Druk
Shift+Enter
om de cel uit te voeren en naar de volgende cel te gaan.
Een subset DataFrame maken
Meer informatie over het maken van een subset DataFrame op basis van een bestaand DataFrame.
Kopieer en plak de volgende code in een lege notebookcel. Deze code maakt gebruik van de Apache Spark-methode
filter
om een nieuw DataFrame te maken dat de gegevens per jaar, aantal en geslacht beperkt. De Apache Spark-methodeselect()
wordt gebruikt om de columnste limit. Het maakt ook gebruik van Apache SparkorderBy()
endesc()
functies om het nieuwe DataFrame te sorteren op aantal.Python
subsetDF = df.filter((df["Year"] == 2009) & (df["Count"] > 100) & (df["Sex"] == "F")).select("First_Name", "County", "Count").orderBy(desc("Count")) display(subsetDF)
Scala
val subsetDF = df.filter((df("Year") === 2009) && (df("Count") > 100) && (df("Sex") === "F")).select("First_Name", "County", "Count").orderBy(desc("Count")) display(subsetDF)
R
subsetDF <- select(filter(df, (df$Count > 100) & (df$year == 2009) & df["Sex"] == "F")), "First_Name", "County", "Count") display(subsetDF)
Druk
Shift+Enter
om de cel uit te voeren en naar de volgende cel te gaan.
Stap 5: Het DataFrame opslaan
Meer informatie over het opslaan van een DataFrame. U kunt uw DataFrame opslaan in een table of het DataFrame naar een bestand of meerdere bestanden schrijven.
Sla het DataFrame op in een table
Azure Databricks maakt standaard gebruik van de Delta Lake-indeling voor alle tables. Als u uw DataFrame wilt opslaan, moet u CREATE
table bevoegdheden hebben voor de catalog en schema.
Kopieer en plak de volgende code in een lege notebookcel. Met deze code wordt de inhoud van het DataFrame opgeslagen in een table met behulp van de variabele die u aan het begin van deze zelfstudie hebt gedefinieerd.
Python
df.write.mode("overwrite").saveAsTable(f"{path_table}.{table_name}")
Scala
df.write.mode("overwrite").saveAsTable(s"$pathTable" + "." + s"$tableName")
R
saveAsTable(df, paste(path_table, ".", table_name), mode = "overwrite")
Druk
Shift+Enter
om de cel uit te voeren en naar de volgende cel te gaan.
De meeste Apache Spark-toepassingen werken op grote gegevenssets en op gedistribueerde wijze. Apache Spark schrijft een map met bestanden uit in plaats van één bestand. Delta Lake splitst de Parquet-mappen en -bestanden. Veel gegevenssystemen kunnen deze mappen met bestanden lezen. Azure Databricks raadt het gebruik van tables aan via bestandspaden voor de meeste toepassingen.
Het DataFrame opslaan in JSON-bestanden
Kopieer en plak de volgende code in een lege notebookcel. Met deze code wordt het DataFrame opgeslagen in een map met JSON-bestanden.
Python
df.write.format("json").mode("overwrite").save("/tmp/json_data")
Scala
df.write.format("json").mode("overwrite").save("/tmp/json_data")
R
write.df(df, path = "/tmp/json_data", source = "json", mode = "overwrite")
Druk
Shift+Enter
om de cel uit te voeren en naar de volgende cel te gaan.
Het DataFrame lezen uit een JSON-bestand
Meer informatie over het gebruik van de Apache Spark-methode spark.read.format()
om JSON-gegevens uit een map te lezen in een DataFrame.
Kopieer en plak de volgende code in een lege notebookcel. Met deze code worden de JSON-bestanden weergegeven die u in het vorige voorbeeld hebt opgeslagen.
Python
display(spark.read.format("json").json("/tmp/json_data"))
Scala
display(spark.read.format("json").json("/tmp/json_data"))
R
display(read.json("/tmp/json_data"))
Druk
Shift+Enter
om de cel uit te voeren en naar de volgende cel te gaan.
Aanvullende taken: SQL-query's uitvoeren in PySpark, Scala en R
Apache Spark DataFrames bieden de volgende opties om SQL te combineren met PySpark, Scala en R. U kunt de volgende code uitvoeren in hetzelfde notebook dat u voor deze zelfstudie hebt gemaakt.
Een column opgeven als een SQL-query
Meer informatie over het gebruik van de Apache Spark-methode selectExpr()
. Dit is een variant van de select()
methode die SQL-expressies accepteert en een bijgewerkt DataFrame retourneert. Met deze methode kunt u een SQL-expressie gebruiken, zoals upper
.
Kopieer en plak de volgende code in een lege notebookcel. Deze code maakt gebruik van de Apache Spark-methode
selectExpr()
en de SQLupper
-expressie om een tekenreeks column te converteren naar hoofdletters (en de naam van de columnte wijzigen).Python
display(df.selectExpr("Count", "upper(County) as big_name"))
Scala
display(df.selectExpr("Count", "upper(County) as big_name"))
R
display(df_selected <- selectExpr(df, "Count", "upper(County) as big_name"))
Druk
Shift+Enter
om de cel uit te voeren en naar de volgende cel te gaan.
Gebruik expr()
om sql-syntaxis te gebruiken voor een column
Meer informatie over het importeren en gebruiken van de Apache Spark-expr()
-functie voor het gebruik van SQL-syntaxis overal waar een column wordt opgegeven.
Kopieer en plak de volgende code in een lege notebookcel. Met deze code wordt de
expr()
-functie geïmporteerd en wordt vervolgens de Apache Spark-expr()
-functie en de SQL-lower
-expressie gebruikt om een tekenreeks column te converteren naar kleine letters (en de naam van de columnte wijzigen).Python
from pyspark.sql.functions import expr display(df.select("Count", expr("lower(County) as little_name")))
Scala
import org.apache.spark.sql.functions.{col, expr} // Scala requires us to import the col() function as well as the expr() function display(df.select(col("Count"), expr("lower(County) as little_name")))
R
display(df_selected <- selectExpr(df, "Count", "lower(County) as little_name")) # expr() function is not supported in R, selectExpr in SparkR replicates this functionality
Druk
Shift+Enter
om de cel uit te voeren en naar de volgende cel te gaan.
Een willekeurige SQL-query uitvoeren met behulp van de functie spark.sql()
Meer informatie over het gebruik van de Apache Spark-functie spark.sql()
om willekeurige SQL-query's uit te voeren.
Kopieer en plak de volgende code in een lege notebookcel. Deze code maakt gebruik van de Apache Spark-
spark.sql()
-functie om een query uit te voeren op een SQL-table met behulp van SQL-syntaxis.Python
display(spark.sql(f"SELECT * FROM {path_table}.{table_name}"))
Scala
display(spark.sql(s"SELECT * FROM $pathTable.$tableName"))
R
display(sql(paste("SELECT * FROM", path_table, ".", table_name)))
Druk
Shift+Enter
om de cel uit te voeren en naar de volgende cel te gaan.
Zelfstudienotebooks voor DataFrame
De volgende notebooks bevatten de voorbeeldenquery's uit deze zelfstudie.