Работа с данными в кадре данных Spark

Завершено

В предыдущем уроке вы узнали, как подключиться к источнику данных, загрузить данные в кадр данных и при необходимости сохранить кадр данных в lakehouse в виде файла или таблицы. Теперь рассмотрим кадр данных немного подробнее.

Изначально Spark использует структуру данных, называемую отказоустойчивым распределенным набором данных (RDD), но в то время как вы можете писать код, который работает непосредственно с удаленными удаленными рабочими столами, наиболее часто используется для работы с структурированными данными в Spark— это кадр данных, предоставляемый в рамках библиотеки SQL Spark. Кадры данных в Spark аналогичны тем, которые используются в универсальной библиотеке Pandas Python, но при этом они оптимизированы для работы в распределенной среде обработки Spark.

Примечание.

В дополнение к API кадра данных Spark SQL предоставляет строго типизированный API набора данных, который поддерживается в Java и Scala. В этом модуле мы будем в первую очередь рассматривать API кадра данных.

Загрузка данных в кадр данных

Рассмотрим гипотетический пример использования кадра данных для работы с данными. Предположим, у вас есть следующие данные в текстовом файле с разделителями-запятыми с именем products.csv в папке Files/data в lakehouse:

ProductID,ProductName,Category,ListPrice
771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...

Вывод схемы

В записной книжке Spark можно использовать следующий код PySpark для загрузки данных файла в кадр данных и отображения первых 10 строк:

%%pyspark
df = spark.read.load('Files/data/products.csv',
    format='csv',
    header=True
)
display(df.limit(10))

Как вы узнали ранее, линия в начале называется магией и сообщает Spark, %%pyspark что язык, используемый в этой ячейке, — PySpark. В большинстве случаев PySpark является языком по умолчанию; и мы обычно будем придерживаться его в примерах в этом модуле. Однако для полноты ниже приведен эквивалентный код Scala для примера данных продуктов:

%%spark
val df = spark.read.format("csv").option("header", "true").load("Files/data/products.csv")
display(df.limit(10))

Магия %%spark используется для указания Spark в ячейке кода Scala. Обратите внимание, что реализация Scala кадра данных работает аналогично версии PySpark.

В обоих примерах кода выходные данные создаются следующим образом:

ProductID НаименованиеПродукта Категория ПрейскурантнаяЦена
771 Mountain-100 Silver, 38 Горные велосипеды 3399.9900
772 Mountain-100 Silver, 42 Горные велосипеды 3399.9900
773 Mountain-100 Silver, 44 Горные велосипеды 3399.9900
... ... ... ...

Указание явной схемы

В предыдущем примере первая строка CSV-файла содержала имена столбцов, а Spark могла определять тип данных в каждом столбце на основе содержащихся в нем данных. Вы также можете указать явную схему для данных; это удобно в том случае, если имена столбцов не включены в файл данных, как в этом примере CSV:

771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...

В следующем примере PySpark показано, как указать схему для кадра данных, загружаемого из файла с именем product-data.csv в следующем формате:

from pyspark.sql.types import *
from pyspark.sql.functions import *

productSchema = StructType([
    StructField("ProductID", IntegerType()),
    StructField("ProductName", StringType()),
    StructField("Category", StringType()),
    StructField("ListPrice", FloatType())
    ])

df = spark.read.load('Files/data/product-data.csv',
    format='csv',
    schema=productSchema,
    header=False)
display(df.limit(10))

Результаты будут снова похожи на:

ProductID НаименованиеПродукта Категория ПрейскурантнаяЦена
771 Mountain-100 Silver, 38 Горные велосипеды 3399.9900
772 Mountain-100 Silver, 42 Горные велосипеды 3399.9900
773 Mountain-100 Silver, 44 Горные велосипеды 3399.9900
... ... ... ...

Совет

Указание явной схемы также повышает производительность!

Фильтрация и группировка кадров данных

Методы класса Dataframe можно использовать для фильтрации, сортировки, группировки и обработки содержащихся в нем данных. Например, в следующем примере кода используется метод select для получения столбцов ProductID и ListPrice из кадра данных df , содержащего данные продукта в предыдущем примере:

pricelist_df = df.select("ProductID", "ListPrice")

Результаты для этого примера кода будут выглядеть примерно так:

ProductID ПрейскурантнаяЦена
771 3399.9900
772 3399.9900
773 3399.9900
... ...

Как правило, при использовании большинства методов обработки данных метод select возвращает новый объект кадра данных.

Совет

Выбор вложенного набора столбцов из кадра данных — это распространенная операция, которую также можно выполнить с помощью следующего, более короткого синтаксиса:

pricelist_df = df["ProductID", "ListPrice"]

Можно объединить методы в цепочку для выполнения ряда операций, которые приводят к преобразованию кадра данных. Например, в этом примере кода показано объединение в цепочку методов select и where для создания нового кадра данных, содержащего столбцы ProductName и ListPrice для продуктов в категории Mountain Bikes или Road Bikes:

bikes_df = df.select("ProductName", "Category", "ListPrice").where((df["Category"]=="Mountain Bikes") | (df["Category"]=="Road Bikes"))
display(bikes_df)

Результаты для этого примера кода будут выглядеть примерно так:

НаименованиеПродукта Категория ПрейскурантнаяЦена
Mountain-100 Silver, 38 Горные велосипеды 3399.9900
Road-750 Black, 52 Шоссейные велосипеды 539.9900
... ... ...

Для группирования и агрегирования данных можно использовать метод groupBy и статистические функции. Например, следующий код PySpark подсчитывает количество продуктов для каждой категории:

counts_df = df.select("ProductID", "Category").groupBy("Category").count()
display(counts_df)

Результаты для этого примера кода будут выглядеть примерно так:

Категория count
Рулевые колонки 3
Колеса 14
Горные велосипеды 32
... ...