Работа с данными в кадре данных Spark
В собственном коде Spark используется структура данных, называемая отказоустойчивым распределенным набором данных (RDD); но хотя вы можете написать код, который работает непосредственно с RDD, чаще всего для работы со структурированными данными в Spark используется кадр данных, предоставляемый в составе библиотеки Spark SQL. Кадры данных в Spark аналогичны тем, которые используются в универсальной библиотеке Pandas Python, но при этом они оптимизированы для работы в распределенной среде обработки Spark.
Примечание.
В дополнение к API кадра данных Spark SQL предоставляет строго типизированный API набора данных, который поддерживается в Java и Scala. В этом модуле мы будем в первую очередь рассматривать API кадра данных.
Загрузка данных в кадр данных
Рассмотрим гипотетический пример использования кадра данных для работы с данными. Предположим, у вас есть следующие данные в текстовом файле с разделителями-запятыми с именем products.csv в папке Files/data в lakehouse:
ProductID,ProductName,Category,ListPrice
771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...
Вывод схемы
В записной книжке Spark можно использовать следующий код PySpark для загрузки данных файла в кадр данных и отображения первых 10 строк:
%%pyspark
df = spark.read.load('Files/data/products.csv',
format='csv',
header=True
)
display(df.limit(10))
Строка %%pyspark
в начале называется магической командой, которая сообщает Spark, что в этой ячейке используется язык PySpark. Вы можете выбрать язык, который нужно использовать по умолчанию, на панели инструментов интерфейса записной книжки, а затем использовать магическую команду для переопределения выбранной ячейки. Например, ниже приведен эквивалентный код Scala для примера данных продуктов:
%%spark
val df = spark.read.format("csv").option("header", "true").load("Files/data/products.csv")
display(df.limit(10))
Магическая команда %%spark
используется для указания Scala.
В обоих примерах кода выходные данные создаются следующим образом:
ProductID | НаименованиеПродукта | Категория | ПрейскурантнаяЦена |
---|---|---|---|
771 | Mountain-100 Silver, 38 | Горные велосипеды | 3399.9900 |
772 | Mountain-100 Silver, 42 | Горные велосипеды | 3399.9900 |
773 | Mountain-100 Silver, 44 | Горные велосипеды | 3399.9900 |
... | ... | ... | ... |
Указание явной схемы
В предыдущем примере первая строка CSV-файла содержала имена столбцов, а Spark могла определять тип данных в каждом столбце на основе содержащихся в нем данных. Вы также можете указать явную схему для данных; это удобно в том случае, если имена столбцов не включены в файл данных, как в этом примере CSV:
771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...
В следующем примере PySpark показано, как указать схему для кадра данных, загружаемого из файла с именем product-data.csv в следующем формате:
from pyspark.sql.types import *
from pyspark.sql.functions import *
productSchema = StructType([
StructField("ProductID", IntegerType()),
StructField("ProductName", StringType()),
StructField("Category", StringType()),
StructField("ListPrice", FloatType())
])
df = spark.read.load('Files/data/product-data.csv',
format='csv',
schema=productSchema,
header=False)
display(df.limit(10))
Результаты будут снова похожи на:
ProductID | НаименованиеПродукта | Категория | ПрейскурантнаяЦена |
---|---|---|---|
771 | Mountain-100 Silver, 38 | Горные велосипеды | 3399.9900 |
772 | Mountain-100 Silver, 42 | Горные велосипеды | 3399.9900 |
773 | Mountain-100 Silver, 44 | Горные велосипеды | 3399.9900 |
... | ... | ... | ... |
Совет
Указание явной схемы также повышает производительность!
Фильтрация и группировка кадров данных
Методы класса Dataframe можно использовать для фильтрации, сортировки, группировки и обработки содержащихся в нем данных. Например, в следующем примере кода используется метод select для получения столбцов ProductID и ListPrice из кадра данных df , содержащего данные продукта в предыдущем примере:
pricelist_df = df.select("ProductID", "ListPrice")
Результаты для этого примера кода будут выглядеть примерно так:
ProductID | ПрейскурантнаяЦена |
---|---|
771 | 3399.9900 |
772 | 3399.9900 |
773 | 3399.9900 |
... | ... |
Как правило, при использовании большинства методов обработки данных метод select возвращает новый объект кадра данных.
Совет
Выбор вложенного набора столбцов из кадра данных — это распространенная операция, которую также можно выполнить с помощью следующего, более короткого синтаксиса:
pricelist_df = df["ProductID", "ListPrice"]
Можно объединить методы в цепочку для выполнения ряда операций, которые приводят к преобразованию кадра данных. Например, в этом примере кода показано объединение в цепочку методов select и where для создания нового кадра данных, содержащего столбцы ProductName и ListPrice для продуктов в категории Mountain Bikes или Road Bikes:
bikes_df = df.select("ProductName", "Category", "ListPrice").where((df["Category"]=="Mountain Bikes") | (df["Category"]=="Road Bikes"))
display(bikes_df)
Результаты для этого примера кода будут выглядеть примерно так:
НаименованиеПродукта | Категория | ПрейскурантнаяЦена |
---|---|---|
Mountain-100 Silver, 38 | Горные велосипеды | 3399.9900 |
Road-750 Black, 52 | Шоссейные велосипеды | 539.9900 |
... | ... | ... |
Для группирования и агрегирования данных можно использовать метод groupBy и статистические функции. Например, следующий код PySpark подсчитывает количество продуктов для каждой категории:
counts_df = df.select("ProductID", "Category").groupBy("Category").count()
display(counts_df)
Результаты для этого примера кода будут выглядеть примерно так:
Категория | count |
---|---|
Рулевые колонки | 3 |
Колеса | 14 |
Горные велосипеды | 32 |
... | ... |
Сохранение кадра данных
Часто требуется использовать Spark для преобразования необработанных данных и сохранения результатов для дальнейшего анализа или последующей обработки вниз. В следующем примере кода кадр данных сохраняется в файл parquet в озере данных, заменив существующий файл того же имени.
bikes_df.write.mode("overwrite").parquet('Files/product_data/bikes.parquet')
Примечание.
Формат Parquet обычно предпочтителен для файлов данных, которые будут использоваться для дальнейшего анализа или приема в аналитическое хранилище. Parquet — это очень эффективный формат, поддерживаемый большинством крупномасштабных систем аналитики данных. В самом деле, иногда требование преобразования данных может быть просто для преобразования данных из другого формата (например, CSV) в Parquet!
Секционирование выходного файла
Секционирование — это метод оптимизации, позволяющий Spark повысить производительность на рабочих узлах. При фильтрации данных в запросах можно добиться большего повышения производительности, устраняя ненужные операции ввода-вывода на диск.
Чтобы сохранить кадр данных в виде секционированного набора файлов, используйте метод partitionBy при записи данных. В следующем примере сохраняется bikes_df кадр данных (который содержит данные продукта для горных велосипедов и категорий дорожных велосипедов) и секционирует данные по категориям:
bikes_df.write.partitionBy("Category").mode("overwrite").parquet("Files/bike_data")
Имена папок, созданные при секционирования кадра данных, включают имя столбца секционирования и значение в формате column=value, поэтому в примере кода создается папка с именем bike_data, содержащая следующие вложенные папки:
- Category=Mountain Bikes
- Category=Road Bikes
Каждая вложенная папка содержит один или несколько файлов parquet с данными продукта для соответствующей категории.
Примечание.
Данные можно секционированием по нескольким столбцам, что приводит к иерархии папок для каждого ключа секционирования. Например, можно секционирование данных заказа на продажу по годам и месяцам, чтобы иерархия папок содержала папку для каждого года, которая, в свою очередь, содержит вложенную папку для каждого значения месяца.
Загрузка секционированных данных
При чтении секционированных данных в кадр данных можно загрузить данные из любой папки в иерархии, указав явные значения или подстановочные знаки для секционированных полей. Следующий пример загружает данные для продуктов в категории Road Bikes :
road_bikes_df = spark.read.parquet('Files/bike_data/Category=Road Bikes')
display(road_bikes_df.limit(5))
Примечание.
Столбцы секционирования, указанные в пути к файлу, опущены в результирующем кадре данных. Результаты, полученные в примере запроса, не включают столбец категории . Категория для всех строк будет Road Bikes.