Kurz: Načtení a transformace dat pomocí datových rámců Apache Sparku
V tomto kurzu se dozvíte, jak načíst a transformovat data pomocí rozhraní API datového rámce Apache Spark Python (PySpark), rozhraní API datového rámce Apache Spark Scala a rozhraní API SparkR SparkDataFrame v Azure Databricks.
Na konci tohoto kurzu pochopíte, co datový rámec je, a seznámíte se s následujícími úlohami:
Python
- Definování proměnných a kopírování veřejných dat do svazku katalogu Unity
- Vytvoření datového rámce pomocí Pythonu
- Načtení dat do datového rámce ze souboru CSV
- Zobrazení datového rámce a interakce s ním
- Uložení datového rámce
- Spouštění dotazů SQL v PySparku
Viz také referenční informace k rozhraní API Apache Spark PySpark.
Scala
- Definování proměnných a kopírování veřejných dat do svazku katalogu Unity
- Vytvoření datového rámce pomocí scaly
- Načtení dat do datového rámce ze souboru CSV
- Zobrazení datového rámce a interakce s ním
- Uložení datového rámce
- Spouštění dotazů SQL v Apache Sparku
Viz také referenční informace k rozhraní Apache Spark Scala API.
R
- Definování proměnných a kopírování veřejných dat do svazku katalogu Unity
- Vytvoření SparkR SparkDataFrames
- Načtení dat do datového rámce ze souboru CSV
- Zobrazení datového rámce a interakce s ním
- Uložení datového rámce
- Spouštění dotazů SQL v SparkR
Viz také referenční informace k rozhraní Apache SparkR API.
Co je datový rámec?
Datový rámec je dvourozměrná datová struktura s sloupci potenciálně různých typů. Datový rámec si můžete představit jako tabulku, tabulku SQL nebo slovník objektů řady. Datové rámce Apache Spark poskytují bohatou sadu funkcí (výběr sloupců, filtrování, spojení, agregace), které umožňují efektivně řešit běžné problémy s analýzou dat.
Datové rámce Apache Sparku jsou abstrakce založená na odolných distribuovaných datových sadách (RDD). Datové rámce Sparku a Spark SQL používají jednotný modul pro plánování a optimalizaci, který umožňuje získat téměř stejný výkon ve všech podporovaných jazycích v Azure Databricks (Python, SQL, Scala a R).
Požadavky
K dokončení následujícího kurzu musíte splnit následující požadavky:
Pokud chcete použít příklady v tomto kurzu, musí mít váš pracovní prostor povolený katalog Unity.
Příklady v tomto kurzu používají k ukládání ukázkových dat svazek katalogu Unity. Pokud chcete tyto příklady použít, vytvořte svazek a použijte katalog, schéma a názvy svazků k nastavení cesty svazku používané příklady.
V katalogu Unity musíte mít následující oprávnění:
READ VOLUME
aWRITE VOLUME
) neboALL PRIVILEGES
pro svazek použitý pro tento kurz.USE SCHEMA
neboALL PRIVILEGES
pro schéma použité pro tento kurz.USE CATALOG
neboALL PRIVILEGES
pro katalog použitý pro tento kurz.
Pokud chcete tato oprávnění nastavit, podívejte se na správce Databricks nebo na oprávnění katalogu Unity a zabezpečitelné objekty.
Tip
Dokončený poznámkový blok pro tento článek najdete v poznámkových blocích kurzu datového rámce.
Krok 1: Definování proměnných a načtení souboru CSV
Tento krok definuje proměnné pro použití v tomto kurzu a pak načte soubor CSV obsahující data názvu dítěte z health.data.ny.gov do svazku katalogu Unity.
Kliknutím na ikonu otevřete nový poznámkový blok. Informace o procházení poznámkových bloků Azure Databricks najdete v tématu Rozhraní a ovládací prvky poznámkového bloku Databricks.
Zkopírujte a vložte následující kód do nové prázdné buňky poznámkového bloku. Nahraďte
<catalog-name>
katalog,<volume-name>
<schema-name>
schéma a názvy svazků pro svazek katalogu Unity. Nahraďte<table_name>
zvoleným názvem tabulky. Data názvu dítěte načtete do této tabulky později v tomto kurzu.Python
catalog = "<catalog_name>" schema = "<schema_name>" volume = "<volume_name>" download_url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv" file_name = "rows.csv" table_name = "<table_name>" path_volume = "/Volumes/" + catalog + "/" + schema + "/" + volume path_table = catalog + "." + schema print(path_table) # Show the complete path print(path_volume) # Show the complete path
Scala
val catalog = "<catalog_name>" val schema = "<schema_name>" val volume = "<volume_name>" val downloadUrl = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv" val fileName = "rows.csv" val tableName = "<table_name>" val pathVolume = s"/Volumes/$catalog/$schema/$volume" val pathTable = s"$catalog.$schema" print(pathVolume) // Show the complete path print(pathTable) // Show the complete path
R
catalog <- "<catalog_name>" schema <- "<schema_name>" volume <- "<volume_name>" download_url <- "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv" file_name <- "rows.csv" table_name <- "<table_name>" path_volume <- paste("/Volumes/", catalog, "/", schema, "/", volume, sep = "") path_table <- paste(catalog, ".", schema, sep = "") print(path_volume) # Show the complete path print(path_table) # Show the complete path
Stisknutím spustíte
Shift+Enter
buňku a vytvoříte novou prázdnou buňku.Zkopírujte a vložte následující kód do nové prázdné buňky poznámkového bloku. Tento kód zkopíruje
rows.csv
soubor z health.data.ny.gov do svazku katalogu Unity pomocí příkazu Databricks dbutuils .Python
dbutils.fs.cp(f"{download_url}", f"{path_volume}/{file_name}")
Scala
dbutils.fs.cp(downloadUrl, s"$pathVolume/$fileName")
R
dbutils.fs.cp(download_url, paste(path_volume, "/", file_name, sep = ""))
Stisknutím klávesy
Shift+Enter
spusťte buňku a přejděte na další buňku.
Krok 2: Vytvoření datového rámce
Tento krok vytvoří datový rámec s testovacími df1
daty a pak zobrazí jeho obsah.
Zkopírujte a vložte následující kód do nové prázdné buňky poznámkového bloku. Tento kód vytvoří datový rámec s testovacími daty a pak zobrazí obsah a schéma datového rámce.
Python
data = [[2021, "test", "Albany", "M", 42]] columns = ["Year", "First_Name", "County", "Sex", "Count"] df1 = spark.createDataFrame(data, schema="Year int, First_Name STRING, County STRING, Sex STRING, Count int") display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization. # df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.
Scala
val data = Seq((2021, "test", "Albany", "M", 42)) val columns = Seq("Year", "First_Name", "County", "Sex", "Count") val df1 = data.toDF(columns: _*) display(df1) // The display() method is specific to Databricks notebooks and provides a richer visualization. // df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.
R
# Load the SparkR package that is already preinstalled on the cluster. library(SparkR) data <- data.frame( Year = as.integer(c(2021)), First_Name = c("test"), County = c("Albany"), Sex = c("M"), Count = as.integer(c(42)) ) df1 <- createDataFrame(data) display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization. # head(df1) The head() method is a part of the Apache SparkR DataFrame API and provides basic visualization.
Stisknutím klávesy
Shift+Enter
spusťte buňku a přejděte na další buňku.
Krok 3: Načtení dat do datového rámce ze souboru CSV
Tento krok vytvoří datový rámec pojmenovaný df_csv
ze souboru CSV, který jste předtím načetli do svazku katalogu Unity. Viz spark.read.csv.
Zkopírujte a vložte následující kód do nové prázdné buňky poznámkového bloku. Tento kód načte data názvu dítěte do datového rámce
df_csv
ze souboru CSV a pak zobrazí obsah datového rámce.Python
df_csv = spark.read.csv(f"{path_volume}/{file_name}", header=True, inferSchema=True, sep=",") display(df_csv)
Scala
val dfCsv = spark.read .option("header", "true") .option("inferSchema", "true") .option("delimiter", ",") .csv(s"$pathVolume/$fileName") display(dfCsv)
R
df_csv <- read.df(paste(path_volume, "/", file_name, sep=""), source="csv", header = TRUE, inferSchema = TRUE, delimiter = ",") display(df_csv)
Stisknutím klávesy
Shift+Enter
spusťte buňku a přejděte na další buňku.
Můžete načíst data z mnoha podporovaných formátů souborů.
Krok 4: Zobrazení datového rámce a interakce s ním
Pomocí následujících metod můžete zobrazit datové rámce s názvy dětí a pracovat s nimi.
Tisk schématu datového rámce
Zjistěte, jak zobrazit schéma datového rámce Apache Spark. Apache Spark používá schéma termínů k odkazování na názvy a datové typy sloupců v datovém rámci.
Poznámka:
Azure Databricks také používá schéma termínů k popisu kolekce tabulek zaregistrovaných v katalogu.
Zkopírujte a vložte následující kód do prázdné buňky poznámkového bloku. Tento kód ukazuje schéma datových rámců s metodou
.printSchema()
pro zobrazení schémat dvou datových rámců – pro přípravu na sjednocení těchto dvou datových rámců.Python
df_csv.printSchema() df1.printSchema()
Scala
dfCsv.printSchema() df1.printSchema()
R
printSchema(df_csv) printSchema(df1)
Stisknutím klávesy
Shift+Enter
spusťte buňku a přejděte na další buňku.
Přejmenování sloupce v datovém rámci
Zjistěte, jak přejmenovat sloupec v datovém rámci.
Zkopírujte a vložte následující kód do prázdné buňky poznámkového bloku. Tento kód přejmenuje sloupec v datovém
df1_csv
rámci tak, aby odpovídal příslušnému sloupci v datovém rámcidf1
. Tento kód používá metodu Apache SparkwithColumnRenamed()
.Python
df_csv = df_csv.withColumnRenamed("First Name", "First_Name") df_csv.printSchema
Scala
val dfCsvRenamed = dfCsv.withColumnRenamed("First Name", "First_Name") // when modifying a DataFrame in Scala, you must assign it to a new variable dfCsvRenamed.printSchema()
R
df_csv <- withColumnRenamed(df_csv, "First Name", "First_Name") printSchema(df_csv)
Stisknutím klávesy
Shift+Enter
spusťte buňku a přejděte na další buňku.
Kombinování datových rámců
Zjistěte, jak vytvořit nový datový rámec, který přidá řádky jednoho datového rámce do druhého.
Zkopírujte a vložte následující kód do prázdné buňky poznámkového bloku. Tento kód používá metodu Apache Spark
union()
ke kombinování obsahu prvního datového rámcedf
s datovým rámcemdf_csv
obsahujícím data názvů dětí načtená ze souboru CSV.Python
df = df1.union(df_csv) display(df)
Scala
val df = df1.union(dfCsvRenamed) display(df)
R
display(df <- union(df1, df_csv))
Stisknutím klávesy
Shift+Enter
spusťte buňku a přejděte na další buňku.
Filtrování řádků v datovém rámci
Seznamte se s nejoblíbenějšími názvy dětí v sadě dat filtrováním řádků pomocí Apache Sparku .filter()
nebo .where()
metod. Pomocí filtrování vyberte podmnožinu řádků, které chcete vrátit nebo upravit v datovém rámci. V výkonu nebo syntaxi není žádný rozdíl, jak je vidět v následujících příkladech.
Použití metody .filter()
Zkopírujte a vložte následující kód do prázdné buňky poznámkového bloku. Tento kód používá metodu Apache Spark
.filter()
k zobrazení těchto řádků v datovém rámci s počtem více než 50.Python
display(df.filter(df["Count"] > 50))
Scala
display(df.filter(df("Count") > 50))
R
display(filteredDF <- filter(df, df$Count > 50))
Stisknutím klávesy
Shift+Enter
spusťte buňku a přejděte na další buňku.
Použití metody .where()
Zkopírujte a vložte následující kód do prázdné buňky poznámkového bloku. Tento kód používá metodu Apache Spark
.where()
k zobrazení těchto řádků v datovém rámci s počtem více než 50.Python
display(df.where(df["Count"] > 50))
Scala
display(df.where(df("Count") > 50))
R
display(filtered_df <- where(df, df$Count > 50))
Stisknutím klávesy
Shift+Enter
spusťte buňku a přejděte na další buňku.
Výběr sloupců z datového rámce a pořadí podle frekvence
Přečtěte si, jakou frekvenci select()
názvu dítěte metoda určuje sloupce z datového rámce, které se mají vrátit. K seřazení výsledků použijte Apache Spark orderby
a desc
funkce.
Modul pyspark.sql pro Apache Spark poskytuje podporu funkcí SQL. Mezi tyto funkce, které používáme v tomto kurzu, patří Apache Spark orderBy()
, desc()
a expr()
funkce. Použití těchto funkcí povolíte tak, že je podle potřeby naimportujete do relace.
Zkopírujte a vložte následující kód do prázdné buňky poznámkového bloku. Tento kód naimportuje
desc()
funkci a pak použije metodu Apache Sparkselect()
a Apache SparkorderBy()
adesc()
funkce k zobrazení nejběžnějších názvů a jejich počtu v sestupném pořadí.Python
from pyspark.sql.functions import desc display(df.select("First_Name", "Count").orderBy(desc("Count")))
Scala
import org.apache.spark.sql.functions.desc display(df.select("First_Name", "Count").orderBy(desc("Count")))
R
display(arrange(select(df, df$First_Name, df$Count), desc(df$Count)))
Stisknutím klávesy
Shift+Enter
spusťte buňku a přejděte na další buňku.
Vytvoření datového rámce podmnožina
Zjistěte, jak vytvořit podmnožinu datového rámce z existujícího datového rámce.
Zkopírujte a vložte následující kód do prázdné buňky poznámkového bloku. Tento kód používá metodu Apache Spark
filter
k vytvoření nového datového rámce, který omezuje data podle roku, počtu a pohlaví. K omezení sloupců používá metodu Apache Sparkselect()
. Používá také Apache SparkorderBy()
adesc()
funkce k seřazení nového datového rámce podle počtu.Python
subsetDF = df.filter((df["Year"] == 2009) & (df["Count"] > 100) & (df["Sex"] == "F")).select("First_Name", "County", "Count").orderBy(desc("Count")) display(subsetDF)
Scala
val subsetDF = df.filter((df("Year") === 2009) && (df("Count") > 100) && (df("Sex") === "F")).select("First_Name", "County", "Count").orderBy(desc("Count")) display(subsetDF)
R
subsetDF <- select(filter(df, (df$Count > 100) & (df$year == 2009) & df["Sex"] == "F")), "First_Name", "County", "Count") display(subsetDF)
Stisknutím klávesy
Shift+Enter
spusťte buňku a přejděte na další buňku.
Krok 5: Uložení datového rámce
Naučte se ukládat datový rámec. Datový rámec můžete uložit do tabulky nebo zapsat datový rámec do souboru nebo do více souborů.
Uložení datového rámce do tabulky
Azure Databricks ve výchozím nastavení používá formát Delta Lake pro všechny tabulky. Pokud chcete datový rámec uložit, musíte mít CREATE
oprávnění tabulky k katalogu a schématu.
Zkopírujte a vložte následující kód do prázdné buňky poznámkového bloku. Tento kód uloží obsah datového rámce do tabulky pomocí proměnné, kterou jste definovali na začátku tohoto kurzu.
Python
df.write.mode("overwrite").saveAsTable(f"{path_table}.{table_name}")
Scala
df.write.mode("overwrite").saveAsTable(s"$pathTable" + "." + s"$tableName")
R
saveAsTable(df, paste(path_table, ".", table_name), mode = "overwrite")
Stisknutím klávesy
Shift+Enter
spusťte buňku a přejděte na další buňku.
Většina aplikací Apache Spark pracuje na velkých datových sadách a distribuovaným způsobem. Apache Spark zapisuje adresář souborů místo jednoho souboru. Delta Lake rozdělí složky a soubory Parquet. Mnoho datových systémů může tyto adresáře souborů číst. Azure Databricks doporučuje používat tabulky přes cesty k souborům pro většinu aplikací.
Uložení datového rámce do souborů JSON
Zkopírujte a vložte následující kód do prázdné buňky poznámkového bloku. Tento kód uloží datový rámec do adresáře souborů JSON.
Python
df.write.format("json").mode("overwrite").save("/tmp/json_data")
Scala
df.write.format("json").mode("overwrite").save("/tmp/json_data")
R
write.df(df, path = "/tmp/json_data", source = "json", mode = "overwrite")
Stisknutím klávesy
Shift+Enter
spusťte buňku a přejděte na další buňku.
Čtení datového rámce ze souboru JSON
Naučte se používat metodu Apache Spark spark.read.format()
ke čtení dat JSON z adresáře do datového rámce.
Zkopírujte a vložte následující kód do prázdné buňky poznámkového bloku. Tento kód zobrazí soubory JSON, které jste uložili v předchozím příkladu.
Python
display(spark.read.format("json").json("/tmp/json_data"))
Scala
display(spark.read.format("json").json("/tmp/json_data"))
R
display(read.json("/tmp/json_data"))
Stisknutím klávesy
Shift+Enter
spusťte buňku a přejděte na další buňku.
Další úlohy: Spouštění dotazů SQL v PySpark, Scala a R
Datové rámce Apache Spark nabízejí následující možnosti pro kombinování SQL s PySpark, Scala a R. Následující kód můžete spustit ve stejném poznámkovém bloku, který jste vytvořili pro účely tohoto kurzu.
Zadání sloupce jako dotazu SQL
Naučte se používat metodu Apache Spark selectExpr()
. Jedná se o variantu select()
metody, která přijímá výrazy SQL a vrací aktualizovaný datový rámec. Tato metoda umožňuje použít výraz SQL, například upper
.
Zkopírujte a vložte následující kód do prázdné buňky poznámkového bloku. Tento kód používá metodu Apache Spark
selectExpr()
a výraz SQLupper
k převodu sloupce řetězce na velká písmena (a přejmenování sloupce).Python
display(df.selectExpr("Count", "upper(County) as big_name"))
Scala
display(df.selectExpr("Count", "upper(County) as big_name"))
R
display(df_selected <- selectExpr(df, "Count", "upper(County) as big_name"))
Stisknutím klávesy
Shift+Enter
spusťte buňku a přejděte na další buňku.
Použití expr()
syntaxe SQL pro sloupec
Naučte se importovat a používat funkci Apache Spark expr()
k použití syntaxe SQL kdekoli, kde by byl zadaný sloupec.
Zkopírujte a vložte následující kód do prázdné buňky poznámkového bloku. Tento kód naimportuje
expr()
funkci a pak použije funkci Apache Sparkexpr()
a výraz SQLlower
k převodu sloupce řetězce na malá písmena (a přejmenování sloupce).Python
from pyspark.sql.functions import expr display(df.select("Count", expr("lower(County) as little_name")))
Scala
import org.apache.spark.sql.functions.{col, expr} // Scala requires us to import the col() function as well as the expr() function display(df.select(col("Count"), expr("lower(County) as little_name")))
R
display(df_selected <- selectExpr(df, "Count", "lower(County) as little_name")) # expr() function is not supported in R, selectExpr in SparkR replicates this functionality
Stisknutím klávesy
Shift+Enter
spusťte buňku a přejděte na další buňku.
Spuštění libovolného dotazu SQL pomocí funkce spark.sql()
Naučte se používat funkci Apache Spark spark.sql()
ke spouštění libovolných dotazů SQL.
Zkopírujte a vložte následující kód do prázdné buňky poznámkového bloku. Tento kód používá funkci Apache Spark
spark.sql()
k dotazování tabulky SQL pomocí syntaxe SQL.Python
display(spark.sql(f"SELECT * FROM {path_table}.{table_name}"))
Scala
display(spark.sql(s"SELECT * FROM $pathTable.$tableName"))
R
display(sql(paste("SELECT * FROM", path_table, ".", table_name)))
Stisknutím klávesy
Shift+Enter
spusťte buňku a přejděte na další buňku.
Poznámkové bloky k datovým rámcům
Následující poznámkové bloky obsahují příklady dotazů z tohoto kurzu.