Работа с данными в кадре данных Spark
В предыдущем уроке вы узнали, как подключиться к источнику данных, загрузить данные в кадр данных и при необходимости сохранить кадр данных в lakehouse в виде файла или таблицы. Теперь рассмотрим кадр данных немного подробнее.
Изначально Spark использует структуру данных, называемую отказоустойчивым распределенным набором данных (RDD), но в то время как вы можете писать код, который работает непосредственно с удаленными удаленными рабочими столами, наиболее часто используется для работы с структурированными данными в Spark— это кадр данных, предоставляемый в рамках библиотеки SQL Spark. Кадры данных в Spark аналогичны тем, которые используются в универсальной библиотеке Pandas Python, но при этом они оптимизированы для работы в распределенной среде обработки Spark.
Примечание.
В дополнение к API кадра данных Spark SQL предоставляет строго типизированный API набора данных, который поддерживается в Java и Scala. В этом модуле мы будем в первую очередь рассматривать API кадра данных.
Загрузка данных в кадр данных
Рассмотрим гипотетический пример использования кадра данных для работы с данными. Предположим, у вас есть следующие данные в текстовом файле с разделителями-запятыми с именем products.csv в папке Files/data в lakehouse:
ProductID,ProductName,Category,ListPrice
771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...
Вывод схемы
В записной книжке Spark можно использовать следующий код PySpark для загрузки данных файла в кадр данных и отображения первых 10 строк:
%%pyspark
df = spark.read.load('Files/data/products.csv',
format='csv',
header=True
)
display(df.limit(10))
Как вы узнали ранее, линия в начале называется магией и сообщает Spark, %%pyspark
что язык, используемый в этой ячейке, — PySpark. В большинстве случаев PySpark является языком по умолчанию; и мы обычно будем придерживаться его в примерах в этом модуле. Однако для полноты ниже приведен эквивалентный код Scala для примера данных продуктов:
%%spark
val df = spark.read.format("csv").option("header", "true").load("Files/data/products.csv")
display(df.limit(10))
Магия %%spark
используется для указания Spark в ячейке кода Scala. Обратите внимание, что реализация Scala кадра данных работает аналогично версии PySpark.
В обоих примерах кода выходные данные создаются следующим образом:
ProductID | НаименованиеПродукта | Категория | ПрейскурантнаяЦена |
---|---|---|---|
771 | Mountain-100 Silver, 38 | Горные велосипеды | 3399.9900 |
772 | Mountain-100 Silver, 42 | Горные велосипеды | 3399.9900 |
773 | Mountain-100 Silver, 44 | Горные велосипеды | 3399.9900 |
... | ... | ... | ... |
Указание явной схемы
В предыдущем примере первая строка CSV-файла содержала имена столбцов, а Spark могла определять тип данных в каждом столбце на основе содержащихся в нем данных. Вы также можете указать явную схему для данных; это удобно в том случае, если имена столбцов не включены в файл данных, как в этом примере CSV:
771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...
В следующем примере PySpark показано, как указать схему для кадра данных, загружаемого из файла с именем product-data.csv в следующем формате:
from pyspark.sql.types import *
from pyspark.sql.functions import *
productSchema = StructType([
StructField("ProductID", IntegerType()),
StructField("ProductName", StringType()),
StructField("Category", StringType()),
StructField("ListPrice", FloatType())
])
df = spark.read.load('Files/data/product-data.csv',
format='csv',
schema=productSchema,
header=False)
display(df.limit(10))
Результаты будут снова похожи на:
ProductID | НаименованиеПродукта | Категория | ПрейскурантнаяЦена |
---|---|---|---|
771 | Mountain-100 Silver, 38 | Горные велосипеды | 3399.9900 |
772 | Mountain-100 Silver, 42 | Горные велосипеды | 3399.9900 |
773 | Mountain-100 Silver, 44 | Горные велосипеды | 3399.9900 |
... | ... | ... | ... |
Совет
Указание явной схемы также повышает производительность!
Фильтрация и группировка кадров данных
Методы класса Dataframe можно использовать для фильтрации, сортировки, группировки и обработки содержащихся в нем данных. Например, в следующем примере кода используется метод select для получения столбцов ProductID и ListPrice из кадра данных df , содержащего данные продукта в предыдущем примере:
pricelist_df = df.select("ProductID", "ListPrice")
Результаты для этого примера кода будут выглядеть примерно так:
ProductID | ПрейскурантнаяЦена |
---|---|
771 | 3399.9900 |
772 | 3399.9900 |
773 | 3399.9900 |
... | ... |
Как правило, при использовании большинства методов обработки данных метод select возвращает новый объект кадра данных.
Совет
Выбор вложенного набора столбцов из кадра данных — это распространенная операция, которую также можно выполнить с помощью следующего, более короткого синтаксиса:
pricelist_df = df["ProductID", "ListPrice"]
Можно объединить методы в цепочку для выполнения ряда операций, которые приводят к преобразованию кадра данных. Например, в этом примере кода показано объединение в цепочку методов select и where для создания нового кадра данных, содержащего столбцы ProductName и ListPrice для продуктов в категории Mountain Bikes или Road Bikes:
bikes_df = df.select("ProductName", "Category", "ListPrice").where((df["Category"]=="Mountain Bikes") | (df["Category"]=="Road Bikes"))
display(bikes_df)
Результаты для этого примера кода будут выглядеть примерно так:
НаименованиеПродукта | Категория | ПрейскурантнаяЦена |
---|---|---|
Mountain-100 Silver, 38 | Горные велосипеды | 3399.9900 |
Road-750 Black, 52 | Шоссейные велосипеды | 539.9900 |
... | ... | ... |
Для группирования и агрегирования данных можно использовать метод groupBy и статистические функции. Например, следующий код PySpark подсчитывает количество продуктов для каждой категории:
counts_df = df.select("ProductID", "Category").groupBy("Category").count()
display(counts_df)
Результаты для этого примера кода будут выглядеть примерно так:
Категория | count |
---|---|
Рулевые колонки | 3 |
Колеса | 14 |
Горные велосипеды | 32 |
... | ... |