Работа с данными в кадре данных Spark

Завершено

В собственном коде Spark используется структура данных, называемая отказоустойчивым распределенным набором данных (RDD); но хотя вы можете написать код, который работает непосредственно с RDD, чаще всего для работы со структурированными данными в Spark используется кадр данных, предоставляемый в составе библиотеки Spark SQL. Кадры данных в Spark аналогичны тем, которые используются в универсальной библиотеке Pandas Python, но при этом они оптимизированы для работы в распределенной среде обработки Spark.

Примечание.

В дополнение к API кадра данных Spark SQL предоставляет строго типизированный API набора данных, который поддерживается в Java и Scala. В этом модуле мы будем в первую очередь рассматривать API кадра данных.

Загрузка данных в кадр данных

Рассмотрим гипотетический пример использования кадра данных для работы с данными. Предположим, у вас есть следующие данные в текстовом файле с разделителями-запятыми с именем products.csv в папке Files/data в lakehouse:

ProductID,ProductName,Category,ListPrice
771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...

Вывод схемы

В записной книжке Spark можно использовать следующий код PySpark для загрузки данных файла в кадр данных и отображения первых 10 строк:

%%pyspark
df = spark.read.load('Files/data/products.csv',
    format='csv',
    header=True
)
display(df.limit(10))

Строка %%pyspark в начале называется магической командой, которая сообщает Spark, что в этой ячейке используется язык PySpark. Вы можете выбрать язык, который нужно использовать по умолчанию, на панели инструментов интерфейса записной книжки, а затем использовать магическую команду для переопределения выбранной ячейки. Например, ниже приведен эквивалентный код Scala для примера данных продуктов:

%%spark
val df = spark.read.format("csv").option("header", "true").load("Files/data/products.csv")
display(df.limit(10))

Магическая команда %%spark используется для указания Scala.

В обоих примерах кода выходные данные создаются следующим образом:

ProductID НаименованиеПродукта Категория ПрейскурантнаяЦена
771 Mountain-100 Silver, 38 Горные велосипеды 3399.9900
772 Mountain-100 Silver, 42 Горные велосипеды 3399.9900
773 Mountain-100 Silver, 44 Горные велосипеды 3399.9900
... ... ... ...

Указание явной схемы

В предыдущем примере первая строка CSV-файла содержала имена столбцов, а Spark могла определять тип данных в каждом столбце на основе содержащихся в нем данных. Вы также можете указать явную схему для данных; это удобно в том случае, если имена столбцов не включены в файл данных, как в этом примере CSV:

771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...

В следующем примере PySpark показано, как указать схему для кадра данных, загружаемого из файла с именем product-data.csv в следующем формате:

from pyspark.sql.types import *
from pyspark.sql.functions import *

productSchema = StructType([
    StructField("ProductID", IntegerType()),
    StructField("ProductName", StringType()),
    StructField("Category", StringType()),
    StructField("ListPrice", FloatType())
    ])

df = spark.read.load('Files/data/product-data.csv',
    format='csv',
    schema=productSchema,
    header=False)
display(df.limit(10))

Результаты будут снова похожи на:

ProductID НаименованиеПродукта Категория ПрейскурантнаяЦена
771 Mountain-100 Silver, 38 Горные велосипеды 3399.9900
772 Mountain-100 Silver, 42 Горные велосипеды 3399.9900
773 Mountain-100 Silver, 44 Горные велосипеды 3399.9900
... ... ... ...

Совет

Указание явной схемы также повышает производительность!

Фильтрация и группировка кадров данных

Методы класса Dataframe можно использовать для фильтрации, сортировки, группировки и обработки содержащихся в нем данных. Например, в следующем примере кода используется метод select для получения столбцов ProductID и ListPrice из кадра данных df , содержащего данные продукта в предыдущем примере:

pricelist_df = df.select("ProductID", "ListPrice")

Результаты для этого примера кода будут выглядеть примерно так:

ProductID ПрейскурантнаяЦена
771 3399.9900
772 3399.9900
773 3399.9900
... ...

Как правило, при использовании большинства методов обработки данных метод select возвращает новый объект кадра данных.

Совет

Выбор вложенного набора столбцов из кадра данных — это распространенная операция, которую также можно выполнить с помощью следующего, более короткого синтаксиса:

pricelist_df = df["ProductID", "ListPrice"]

Можно объединить методы в цепочку для выполнения ряда операций, которые приводят к преобразованию кадра данных. Например, в этом примере кода показано объединение в цепочку методов select и where для создания нового кадра данных, содержащего столбцы ProductName и ListPrice для продуктов в категории Mountain Bikes или Road Bikes:

bikes_df = df.select("ProductName", "Category", "ListPrice").where((df["Category"]=="Mountain Bikes") | (df["Category"]=="Road Bikes"))
display(bikes_df)

Результаты для этого примера кода будут выглядеть примерно так:

НаименованиеПродукта Категория ПрейскурантнаяЦена
Mountain-100 Silver, 38 Горные велосипеды 3399.9900
Road-750 Black, 52 Шоссейные велосипеды 539.9900
... ... ...

Для группирования и агрегирования данных можно использовать метод groupBy и статистические функции. Например, следующий код PySpark подсчитывает количество продуктов для каждой категории:

counts_df = df.select("ProductID", "Category").groupBy("Category").count()
display(counts_df)

Результаты для этого примера кода будут выглядеть примерно так:

Категория count
Рулевые колонки 3
Колеса 14
Горные велосипеды 32
... ...

Сохранение кадра данных

Часто требуется использовать Spark для преобразования необработанных данных и сохранения результатов для дальнейшего анализа или последующей обработки вниз. В следующем примере кода кадр данных сохраняется в файл parquet в озере данных, заменив существующий файл того же имени.

bikes_df.write.mode("overwrite").parquet('Files/product_data/bikes.parquet')

Примечание.

Формат Parquet обычно предпочтителен для файлов данных, которые будут использоваться для дальнейшего анализа или приема в аналитическое хранилище. Parquet — это очень эффективный формат, поддерживаемый большинством крупномасштабных систем аналитики данных. В самом деле, иногда требование преобразования данных может быть просто для преобразования данных из другого формата (например, CSV) в Parquet!

Секционирование выходного файла

Секционирование — это метод оптимизации, позволяющий Spark повысить производительность на рабочих узлах. При фильтрации данных в запросах можно добиться большего повышения производительности, устраняя ненужные операции ввода-вывода на диск.

Чтобы сохранить кадр данных в виде секционированного набора файлов, используйте метод partitionBy при записи данных. В следующем примере сохраняется bikes_df кадр данных (который содержит данные продукта для горных велосипедов и категорий дорожных велосипедов) и секционирует данные по категориям:

bikes_df.write.partitionBy("Category").mode("overwrite").parquet("Files/bike_data")

Имена папок, созданные при секционирования кадра данных, включают имя столбца секционирования и значение в формате column=value, поэтому в примере кода создается папка с именем bike_data, содержащая следующие вложенные папки:

  • Category=Mountain Bikes
  • Category=Road Bikes

Каждая вложенная папка содержит один или несколько файлов parquet с данными продукта для соответствующей категории.

Примечание.

Данные можно секционированием по нескольким столбцам, что приводит к иерархии папок для каждого ключа секционирования. Например, можно секционирование данных заказа на продажу по годам и месяцам, чтобы иерархия папок содержала папку для каждого года, которая, в свою очередь, содержит вложенную папку для каждого значения месяца.

Загрузка секционированных данных

При чтении секционированных данных в кадр данных можно загрузить данные из любой папки в иерархии, указав явные значения или подстановочные знаки для секционированных полей. Следующий пример загружает данные для продуктов в категории Road Bikes :

road_bikes_df = spark.read.parquet('Files/bike_data/Category=Road Bikes')
display(road_bikes_df.limit(5))

Примечание.

Столбцы секционирования, указанные в пути к файлу, опущены в результирующем кадре данных. Результаты, полученные в примере запроса, не включают столбец категории . Категория для всех строк будет Road Bikes.