Руководство. Загрузка и преобразование данных с помощью кадров данных Apache Spark
В этом руководстве показано, как загружать и преобразовывать данные с помощью API Кадра данных Apache Spark (PySpark), API Apache Spark Scala DataFrame и API SparkR SparkDataFrame в Azure Databricks.
В конце этого руководства вы узнаете, что такое кадр данных, и ознакомится со следующими задачами:
Python
- Определение переменных и копирование общедоступных данных в том каталога Unity
- Создание кадра данных с помощью Python
- Загрузка данных в кадр данных из CSV-файла
- Просмотр и взаимодействие с кадром данных
- Сохранение кадра данных
- Выполнение запросов SQL в PySpark
См. также справочник по API Apache Spark PySpark.
Scala
- Определение переменных и копирование общедоступных данных в том каталога Unity
- Создание кадра данных с помощью Scala
- Загрузка данных в кадр данных из CSV-файла
- Просмотр и взаимодействие с кадром данных
- Сохранение кадра данных
- Выполнение запросов SQL в Apache Spark
См. также справочник по API Apache Spark Scala.
R
- Определение переменных и копирование общедоступных данных в том каталога Unity
- Создание объектов SparkR SparkDataFrames
- Загрузка данных в кадр данных из CSV-файла
- Просмотр и взаимодействие с кадром данных
- Сохранение кадра данных
- Выполнение запросов SQL в SparkR
См. также справочник по API Apache SparkR.
Что такое кадр данных?
Кадр данных — это двухмерная структура данных с столбцами потенциально разных типов. Вы можете представить DataFrame как электронную таблицу, таблицу SQL или словарь объектов Series. DataFrame Apache Spark предоставляют широкий набор функций (выбор столбцов, фильтрация, соединение, агрегирование), которые позволяют эффективно решать типичные задачи анализа данных.
Кадры данных Apache Spark — это абстракция, созданная на основе устойчивых распределенных наборов данных (RDD). Кадры данных Spark и Spark SQL используют единый механизм планирования и оптимизации, что позволяет получить почти одинаковую производительность на всех поддерживаемых языках в Azure Databricks (Python, SQL, Scala и R).
Требования
Чтобы выполнить следующее руководство, необходимо выполнить следующие требования:
Чтобы воспользоваться примерами, приведенными в этом руководстве, в рабочей области должна быть включена функция Unity Catalog.
В примерах этого руководства используется тома каталога Unity для хранения примеров данных. Чтобы использовать эти примеры, создайте том и используйте каталог, схему и имена томов, чтобы задать путь тома, используемый примерами.
У вас должны быть следующие разрешения в каталоге Unity:
-
READ VOLUME
иWRITE VOLUME
( илиALL PRIVILEGES
для тома, используемого для этого руководства. -
USE SCHEMA
илиALL PRIVILEGES
для схемы, используемой в этом руководстве. - В этом руководстве используется каталог
USE CATALOG
илиALL PRIVILEGES
.
Чтобы задать эти разрешения, обратитесь к администратору Databricks или привилегиям каталога Unity и защищаемым объектам.
-
Совет
Полный записной книжки для этой статьи см . в записных книжках учебников dataFrame.
Шаг 1. Определение переменных и загрузка CSV-файла
Этот шаг определяет переменные для использования в этом руководстве, а затем загружает CSV-файл, содержащий данные имени ребенка из health.data.ny.gov в ваш том Unity Catalog.
Откройте новую записную книжку, щелкнув значок. Сведения о том, как перемещаться по записным книжкам Azure Databricks, см. в статье Настройка внешнего вида записной книжки.
Скопируйте и вставьте следующий код в новую пустую ячейку записной книжки. Замените
<catalog-name>
,<schema-name>
и<volume-name>
каталогом, схемой и именами томов для тома каталога Unity. Замените<table_name>
на имя таблицы по вашему выбору. Вы загрузите данные имени ребенка в эту таблицу далее в этом руководстве.Python
catalog = "<catalog_name>" schema = "<schema_name>" volume = "<volume_name>" download_url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv" file_name = "rows.csv" table_name = "<table_name>" path_volume = "/Volumes/" + catalog + "/" + schema + "/" + volume path_table = catalog + "." + schema print(path_table) # Show the complete path print(path_volume) # Show the complete path
Scala
val catalog = "<catalog_name>" val schema = "<schema_name>" val volume = "<volume_name>" val downloadUrl = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv" val fileName = "rows.csv" val tableName = "<table_name>" val pathVolume = s"/Volumes/$catalog/$schema/$volume" val pathTable = s"$catalog.$schema" print(pathVolume) // Show the complete path print(pathTable) // Show the complete path
R
catalog <- "<catalog_name>" schema <- "<schema_name>" volume <- "<volume_name>" download_url <- "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv" file_name <- "rows.csv" table_name <- "<table_name>" path_volume <- paste("/Volumes/", catalog, "/", schema, "/", volume, sep = "") path_table <- paste(catalog, ".", schema, sep = "") print(path_volume) # Show the complete path print(path_table) # Show the complete path
Нажмите
Shift+Enter
, чтобы запустить ячейку и создать пустую ячейку.Скопируйте и вставьте следующий код в новую пустую ячейку записной книжки. Этот код копирует файл
rows.csv
из health.data.ny.gov в том каталога Unity с помощью команды Databricks dbutuils.Python
dbutils.fs.cp(f"{download_url}", f"{path_volume}/{file_name}")
Scala
dbutils.fs.cp(downloadUrl, s"$pathVolume/$fileName")
R
dbutils.fs.cp(download_url, paste(path_volume, "/", file_name, sep = ""))
Нажмите
Shift+Enter
, чтобы запустить ячейку, а затем перейдите к следующей ячейке.
Шаг 2. Создание кадра данных
На этом шаге создается кадр данных с именами df1
тестов, а затем отображается его содержимое.
Скопируйте и вставьте следующий код в новую пустую ячейку записной книжки. Этот код создает кадр данных с тестируемыми данными, а затем отображает содержимое и схему кадра данных.
Python
data = [[2021, "test", "Albany", "M", 42]] columns = ["Year", "First_Name", "County", "Sex", "Count"] df1 = spark.createDataFrame(data, schema="Year int, First_Name STRING, County STRING, Sex STRING, Count int") display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization. # df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.
Scala
val data = Seq((2021, "test", "Albany", "M", 42)) val columns = Seq("Year", "First_Name", "County", "Sex", "Count") val df1 = data.toDF(columns: _*) display(df1) // The display() method is specific to Databricks notebooks and provides a richer visualization. // df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.
R
# Load the SparkR package that is already preinstalled on the cluster. library(SparkR) data <- data.frame( Year = as.integer(c(2021)), First_Name = c("test"), County = c("Albany"), Sex = c("M"), Count = as.integer(c(42)) ) df1 <- createDataFrame(data) display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization. # head(df1) The head() method is a part of the Apache SparkR DataFrame API and provides basic visualization.
Нажмите
Shift+Enter
, чтобы запустить ячейку, а затем перейдите к следующей ячейке.
Шаг 3. Загрузка данных в кадр данных из CSV-файла
На этом шаге создается DataFrame с именем df_csv
из CSV-файла, который вы ранее загружали в объем Unity Catalog. См . spark.read.csv.
Скопируйте и вставьте следующий код в новую пустую ячейку записной книжки. Этот код загружает данные имени ребенка в кадр данных
df_csv
из CSV-файла, а затем отображает содержимое кадра данных.Python
df_csv = spark.read.csv(f"{path_volume}/{file_name}", header=True, inferSchema=True, sep=",") display(df_csv)
Scala
val dfCsv = spark.read .option("header", "true") .option("inferSchema", "true") .option("delimiter", ",") .csv(s"$pathVolume/$fileName") display(dfCsv)
R
df_csv <- read.df(paste(path_volume, "/", file_name, sep=""), source="csv", header = TRUE, inferSchema = TRUE, delimiter = ",") display(df_csv)
Нажмите
Shift+Enter
, чтобы запустить ячейку, а затем перейдите к следующей ячейке.
Данные можно загрузить из многих поддерживаемых форматов файлов.
Шаг 4. Просмотр и взаимодействие с кадром данных
Просмотр и взаимодействие с кадрами данных ребенка с помощью следующих методов.
Печать схемы Кадра данных
Узнайте, как отобразить схему DataFrame в Apache Spark. Apache Spark использует термин схема, чтобы обозначить имена и типы данных столбцов в DataFrame.
Примечание.
Azure Databricks также использует схему терминов для описания коллекции таблиц, зарегистрированных в каталоге.
Скопируйте и вставьте следующий код в пустую ячейку записной книжки. В этом коде показана схема кадров данных с помощью метода
.printSchema()
для просмотра схем двух кадров данных для подготовки к объединению двух кадров данных.Python
df_csv.printSchema() df1.printSchema()
Scala
dfCsv.printSchema() df1.printSchema()
R
printSchema(df_csv) printSchema(df1)
Нажмите
Shift+Enter
, чтобы запустить ячейку, а затем перейдите к следующей ячейке.
Переименовать столбец в DataFrame
Узнайте, как переименовать столбец в DataFrame.
Скопируйте и вставьте следующий код в пустую ячейку записной книжки. Этот код переименовывает столбец в кадре данных
df1_csv
для сопоставления соответствующего столбца вdf1
кадре данных. Этот код использует метод Apache SparkwithColumnRenamed()
.Python
df_csv = df_csv.withColumnRenamed("First Name", "First_Name") df_csv.printSchema
Scala
val dfCsvRenamed = dfCsv.withColumnRenamed("First Name", "First_Name") // when modifying a DataFrame in Scala, you must assign it to a new variable dfCsvRenamed.printSchema()
R
df_csv <- withColumnRenamed(df_csv, "First Name", "First_Name") printSchema(df_csv)
Нажмите
Shift+Enter
, чтобы запустить ячейку, а затем перейдите к следующей ячейке.
Объединение кадров данных
Узнайте, как создать новый кадр данных, который добавляет строки одного кадра данных в другой.
Скопируйте и вставьте следующий код в пустую ячейку записной книжки. Этот код использует метод Apache Spark
union()
для объединения содержимого первого кадраdf
данных с кадром данныхdf_csv
, содержащим данные дочерних имен, загруженные из CSV-файла.Python
df = df1.union(df_csv) display(df)
Scala
val df = df1.union(dfCsvRenamed) display(df)
R
display(df <- union(df1, df_csv))
Нажмите
Shift+Enter
, чтобы запустить ячейку, а затем перейдите к следующей ячейке.
Фильтрация строк в кадре данных
Узнайте о самых популярных именах детей в наборе данных, отфильтровав строки, используя методы Apache Spark .filter()
или .where()
. Используйте фильтрацию, чтобы выбрать подмножество строк для возврата или изменения в DataFrame. Нет различий в производительности или синтаксисе, как показано в следующих примерах.
Использование метода .filter()
Скопируйте и вставьте следующий код в пустую ячейку записной книжки. Этот код использует метод Apache Spark
.filter()
для отображения этих строк в кадре данных с числом более 50.Python
display(df.filter(df["Count"] > 50))
Scala
display(df.filter(df("Count") > 50))
R
display(filteredDF <- filter(df, df$Count > 50))
Нажмите
Shift+Enter
, чтобы запустить ячейку, а затем перейдите к следующей ячейке.
Использование метода .where()
Скопируйте и вставьте следующий код в пустую ячейку записной книжки. Этот код использует метод Apache Spark
.where()
для отображения этих строк в кадре данных с числом более 50.Python
display(df.where(df["Count"] > 50))
Scala
display(df.where(df("Count") > 50))
R
display(filtered_df <- where(df, df$Count > 50))
Нажмите
Shift+Enter
, чтобы запустить ячейку, а затем перейдите к следующей ячейке.
Выберите столбцы из таблицы данных и упорядочьте по частоте
Узнайте о том, какие имена для детей встречаются чаще, с помощью метода select()
, чтобы указать столбцы DataFrame, которые необходимо вернуть. Используйте Apache Spark orderby
и desc
функции для упорядочивания результатов.
Модуль pyspark.sql для Apache Spark обеспечивает поддержку функций SQL. Среди этих функций, которые мы используем в этом руководстве, являются Apache Spark orderBy()
desc()
и expr()
функции. Вы можете использовать эти функции, импортируя их в сеанс по мере необходимости.
Скопируйте и вставьте следующий код в пустую ячейку записной книжки. Этот код импортирует
desc()
функцию, а затем использует метод Apache Sparkselect()
orderBy()
desc()
и функции Для отображения наиболее распространенных имен и их счетчиков в порядке убывания.Python
from pyspark.sql.functions import desc display(df.select("First_Name", "Count").orderBy(desc("Count")))
Scala
import org.apache.spark.sql.functions.desc display(df.select("First_Name", "Count").orderBy(desc("Count")))
R
display(arrange(select(df, df$First_Name, df$Count), desc(df$Count)))
Нажмите
Shift+Enter
, чтобы запустить ячейку, а затем перейдите к следующей ячейке.
Создание подмножества кадра данных
Узнайте, как создать подмножество кадра данных из существующего кадра данных.
Скопируйте и вставьте следующий код в пустую ячейку записной книжки. Этот код использует метод Apache Spark
filter
для создания нового кадра данных, ограничивающего данные по годам, подсчетам и сексу. Он использует метод Apache Sparkselect()
для ограничения столбцов. Он также использует Apache SparkorderBy()
иdesc()
функции для сортировки нового кадра данных по количеству.Python
subsetDF = df.filter((df["Year"] == 2009) & (df["Count"] > 100) & (df["Sex"] == "F")).select("First_Name", "County", "Count").orderBy(desc("Count")) display(subsetDF)
Scala
val subsetDF = df.filter((df("Year") === 2009) && (df("Count") > 100) && (df("Sex") === "F")).select("First_Name", "County", "Count").orderBy(desc("Count")) display(subsetDF)
R
subsetDF <- select(filter(df, (df$Count > 100) & (df$year == 2009) & df["Sex"] == "F")), "First_Name", "County", "Count") display(subsetDF)
Нажмите
Shift+Enter
, чтобы запустить ячейку, а затем перейдите к следующей ячейке.
Шаг 5. Сохранение кадра данных
Узнайте, как сохранить кадр данных. Вы можете сохранить кадр данных в таблицу или записать кадр данных в файл или несколько файлов.
Сохраните DataFrame в таблицу.
Azure Databricks использует формат Delta Lake для всех таблиц по умолчанию. Чтобы сохранить DataFrame, необходимо иметь привилегии таблицы CREATE
в каталоге и схеме.
Скопируйте и вставьте следующий код в пустую ячейку записной книжки. Этот код сохраняет данные DataFrame в таблицу с использованием переменной, которую вы определили в начале этого руководства.
Python
df.write.mode("overwrite").saveAsTable(f"{path_table}.{table_name}")
Scala
df.write.mode("overwrite").saveAsTable(s"$pathTable" + "." + s"$tableName")
R
saveAsTable(df, paste(path_table, ".", table_name), mode = "overwrite")
Нажмите
Shift+Enter
, чтобы запустить ячейку, а затем перейдите к следующей ячейке.
Большинство приложений Apache Spark работают над большими наборами данных и распределенной модой. Apache Spark записывает каталог файлов, а не один файл. Delta Lake разделяет папки и файлы Parquet. Многие системы данных могут считывать эти каталоги файлов. Azure Databricks рекомендует использовать таблицы по пути к файлам для большинства приложений.
Сохранение кадра данных в JSON-файлы
Скопируйте и вставьте следующий код в пустую ячейку записной книжки. Этот код сохраняет кадр данных в каталог JSON-файлов.
Python
df.write.format("json").mode("overwrite").save("/tmp/json_data")
Scala
df.write.format("json").mode("overwrite").save("/tmp/json_data")
R
write.df(df, path = "/tmp/json_data", source = "json", mode = "overwrite")
Нажмите
Shift+Enter
, чтобы запустить ячейку, а затем перейдите к следующей ячейке.
Чтение кадра данных из JSON-файла
Узнайте, как использовать метод Apache Spark spark.read.format()
для чтения данных JSON из каталога в кадр данных.
Скопируйте и вставьте следующий код в пустую ячейку записной книжки. Этот код отображает файлы JSON, сохраненные в предыдущем примере.
Python
display(spark.read.format("json").json("/tmp/json_data"))
Scala
display(spark.read.format("json").json("/tmp/json_data"))
R
display(read.json("/tmp/json_data"))
Нажмите
Shift+Enter
, чтобы запустить ячейку, а затем перейдите к следующей ячейке.
Дополнительные задачи: выполнение запросов SQL в PySpark, Scala и R
Кадры данных Apache Spark предоставляют следующие параметры для объединения SQL с PySpark, Scala и R. Приведенный ниже код можно запустить в той же записной книжке, которую вы создали для этого руководства.
Указание столбца в виде SQL-запроса
Узнайте, как использовать метод Apache Spark selectExpr()
. Это вариант select()
метода, который принимает выражения SQL и возвращает обновленный кадр данных. Этот метод позволяет использовать выражение SQL, например upper
.
Скопируйте и вставьте следующий код в пустую ячейку записной книжки. Этот код использует метод
selectExpr()
Apache Spark и выражение SQLupper
для преобразования строкового столбца в верхний регистр (и переименования столбца).Python
display(df.selectExpr("Count", "upper(County) as big_name"))
Scala
display(df.selectExpr("Count", "upper(County) as big_name"))
R
display(df_selected <- selectExpr(df, "Count", "upper(County) as big_name"))
Нажмите
Shift+Enter
, чтобы запустить ячейку, а затем перейдите к следующей ячейке.
Использование expr()
для использования синтаксиса SQL для столбца
Узнайте, как импортировать и использовать функцию Apache Spark expr()
для использования синтаксиса SQL в любом месте, где будет указан столбец.
Скопируйте и вставьте следующий код в пустую ячейку записной книжки. Этот код импортирует функцию
expr()
, а затем использует функцию Apache Sparkexpr()
и выражение SQLlower
для преобразования строкового столбца в нижний регистр (и переименования столбца).Python
from pyspark.sql.functions import expr display(df.select("Count", expr("lower(County) as little_name")))
Scala
import org.apache.spark.sql.functions.{col, expr} // Scala requires us to import the col() function as well as the expr() function display(df.select(col("Count"), expr("lower(County) as little_name")))
R
display(df_selected <- selectExpr(df, "Count", "lower(County) as little_name")) # expr() function is not supported in R, selectExpr in SparkR replicates this functionality
Нажмите
Shift+Enter
, чтобы запустить ячейку, а затем перейдите к следующей ячейке.
Выполнение произвольного SQL-запроса с помощью функции spark.sql()
Узнайте, как использовать функцию Apache Spark spark.sql()
для выполнения произвольных запросов SQL.
Скопируйте и вставьте следующий код в пустую ячейку записной книжки. Этот код использует функцию Apache Spark
spark.sql()
для запроса таблицы SQL с помощью синтаксиса SQL.Python
display(spark.sql(f"SELECT * FROM {path_table}.{table_name}"))
Scala
display(spark.sql(s"SELECT * FROM $pathTable.$tableName"))
R
display(sql(paste("SELECT * FROM", path_table, ".", table_name)))
Нажмите
Shift+Enter
, чтобы запустить ячейку, а затем перейдите к следующей ячейке.
Записные книжки учебника по кадрам данных
В следующих записных книжках приведены примеры запросов из этого руководства.