Анализ данных с помощью Spark
Одним из преимуществ использования Spark является возможность написания и выполнения кода на различных языках программирования, что позволяет использовать существующие навыки программирования и выбрать наиболее подходящий язык для данной задачи. Язык по умолчанию в новой записной книжке Azure Synapse Analytics Spark — это pySpark, оптимизированная для Spark версия Python, которую специалисты по обработке и анализу данных обычно выбирают благодаря поддержке обработки и визуализации данных. Кроме того, можно использовать такие языки, как Scala (производный от Java язык, который можно использовать в интерактивном режиме) и SQL (вариант часто используемого языка SQL, включенного в библиотеку Spark SQL для работы с реляционными структурами данных). Инженеры программного обеспечения также могут создавать скомпилированные решения, которые выполняются в Spark с помощью таких платформ, как Java и Microsoft .NET.
Изучение данных с помощью кадров данных
В собственном коде Spark используется структура данных, называемая отказоустойчивым распределенным набором данных (RDD); но хотя вы можете написать код, который работает непосредственно с RDD, чаще всего для работы со структурированными данными в Spark используется кадр данных, предоставляемый в составе библиотеки Spark SQL. Кадры данных в Spark аналогичны тем, которые используются в универсальной библиотеке Pandas Python, но при этом они оптимизированы для работы в распределенной среде обработки Spark.
Примечание.
В дополнение к API кадра данных Spark SQL предоставляет строго типизированный API набора данных, который поддерживается в Java и Scala. В этом модуле мы будем в первую очередь рассматривать API кадра данных.
Загрузка данных в кадр данных
Рассмотрим гипотетический пример использования кадра данных для работы с данными. Предположим, в вашей основной учетной записи хранения для рабочей области Azure Synapse Analytics есть следующие данные в текстовом файле с разделением запятыми с именем products.csv:
ProductID,ProductName,Category,ListPrice
771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...
В записной книжке Spark можно использовать следующий код PySpark для загрузки данных в кадр данных и отображения первых 10 строк:
%%pyspark
df = spark.read.load('abfss://container@store.dfs.core.windows.net/products.csv',
format='csv',
header=True
)
display(df.limit(10))
Строка %%pyspark
в начале называется магической командой, которая сообщает Spark, что в этой ячейке используется язык PySpark. Вы можете выбрать язык, который нужно использовать по умолчанию, на панели инструментов интерфейса записной книжки, а затем использовать магическую команду для переопределения выбранной ячейки. Например, ниже приведен эквивалентный код Scala для примера данных продуктов:
%%spark
val df = spark.read.format("csv").option("header", "true").load("abfss://container@store.dfs.core.windows.net/products.csv")
display(df.limit(10))
Магическая команда %%spark
используется для указания Scala.
В обоих примерах кода выходные данные создаются следующим образом:
ProductID | НаименованиеПродукта | Категория | ПрейскурантнаяЦена |
---|---|---|---|
771 | Mountain-100 Silver, 38 | Горные велосипеды | 3399.9900 |
772 | Mountain-100 Silver, 42 | Горные велосипеды | 3399.9900 |
773 | Mountain-100 Silver, 44 | Горные велосипеды | 3399.9900 |
... | ... | ... | ... |
Указание схемы кадра данных
В предыдущем примере первая строка CSV-файла содержала имена столбцов, а Spark могла определять тип данных в каждом столбце на основе содержащихся в нем данных. Вы также можете указать явную схему для данных; это удобно в том случае, если имена столбцов не включены в файл данных, как в этом примере CSV:
771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...
В следующем примере PySpark показано, как указать схему для кадра данных, загружаемого из файла с именем product-data.csv в следующем формате:
from pyspark.sql.types import *
from pyspark.sql.functions import *
productSchema = StructType([
StructField("ProductID", IntegerType()),
StructField("ProductName", StringType()),
StructField("Category", StringType()),
StructField("ListPrice", FloatType())
])
df = spark.read.load('abfss://container@store.dfs.core.windows.net/product-data.csv',
format='csv',
schema=productSchema,
header=False)
display(df.limit(10))
Результаты будут снова похожи на:
ProductID | НаименованиеПродукта | Категория | ПрейскурантнаяЦена |
---|---|---|---|
771 | Mountain-100 Silver, 38 | Горные велосипеды | 3399.9900 |
772 | Mountain-100 Silver, 42 | Горные велосипеды | 3399.9900 |
773 | Mountain-100 Silver, 44 | Горные велосипеды | 3399.9900 |
... | ... | ... | ... |
Фильтрация и группировка кадров данных
Методы класса Dataframe можно использовать для фильтрации, сортировки, группировки и обработки содержащихся в нем данных. Например, в следующем примере кода используется метод select для получения столбцов ProductName и ListPrice из кадра данных df, где содержатся данные продукта, рассматриваемого в предыдущем примере:
pricelist_df = df.select("ProductID", "ListPrice")
Результаты для этого примера кода будут выглядеть примерно так:
ProductID | ПрейскурантнаяЦена |
---|---|
771 | 3399.9900 |
772 | 3399.9900 |
773 | 3399.9900 |
... | ... |
Как правило, при использовании большинства методов обработки данных метод select возвращает новый объект кадра данных.
Совет
Выбор вложенного набора столбцов из кадра данных — это распространенная операция, которую также можно выполнить с помощью следующего, более короткого синтаксиса:
pricelist_df = df["ProductID", "ListPrice"]
Можно объединить методы в цепочку для выполнения ряда операций, которые приводят к преобразованию кадра данных. Например, в этом примере кода показано объединение в цепочку методов select и where для создания нового кадра данных, содержащего столбцы ProductName и ListPrice для продуктов в категории Mountain Bikes или Road Bikes:
bikes_df = df.select("ProductName", "ListPrice").where((df["Category"]=="Mountain Bikes") | (df["Category"]=="Road Bikes"))
display(bikes_df)
Результаты для этого примера кода будут выглядеть примерно так:
НаименованиеПродукта | ПрейскурантнаяЦена |
---|---|
Mountain-100 Silver, 38 | 3399.9900 |
Road-750 Black, 52 | 539.9900 |
... | ... |
Для группирования и агрегирования данных можно использовать метод groupBy и статистические функции. Например, следующий код PySpark подсчитывает количество продуктов для каждой категории:
counts_df = df.select("ProductID", "Category").groupBy("Category").count()
display(counts_df)
Результаты для этого примера кода будут выглядеть примерно так:
Категория | count |
---|---|
Рулевые колонки | 3 |
Колеса | 14 |
Горные велосипеды | 32 |
... | ... |
Использование выражений SQL в Spark
API кадра данных входит в библиотеку Spark с именем Spark SQL, позволяя аналитикам данных использовать выражения SQL для запроса данных и управления ими.
Создание объектов базы данных в каталоге Spark
Каталог Spark — это хранилище метаданных для реляционных объектов данных, таких как представления и таблицы. Среда выполнения Spark может использовать каталог для простой и эффективной интеграции кода, написанного на любом языке, поддерживаемом Spark, с использованием выражений SQL, которые будут более привычными для многих аналитиков и разработчиков данных.
Одним из самых простых способов сделать данные в кадре данных доступными для запроса в каталоге Spark, является создание временного представления, как показано в следующем примере кода:
df.createOrReplaceTempView("products")
Представление является временным, то есть оно автоматически удаляется в конце текущего сеанса. Вы также можете создавать таблицы, которые сохраняются в каталоге, чтобы определить базу данных, которую можно запросить с помощью Spark SQL.
Примечание.
Мы не будем подробно изучать таблицы каталога Spark в этом модуле, однако стоит выделить несколько ключевых моментов:
- Можно создать пустую таблицу с помощью метода
spark.catalog.createTable
. Таблицы — это структуры метаданных, базовые данные которых хранятся в расположении хранилища, связанном с каталогом. При удалении таблицы также удаляются базовые данные. - Кадр данных можно сохранить в виде таблицы с помощью метода
saveAsTable
. - Можно создать внешнюю таблицу с помощью метода
spark.catalog.createExternalTable
. Внешние таблицы определяют метаданные в каталоге, но получают их базовые данные из внешнего расположения хранилища; обычно это папка в озере данных. При удалении внешней таблицы базовые данные не удаляются.
Использование API SQL Spark для запроса данных
API SQL Spark можно использовать в коде, написанном на любом языке, чтобы запрашивать данные в каталоге. Например, следующий код PySpark использует запрос SQL для возврата данных из представления продуктов в виде кадра данных.
bikes_df = spark.sql("SELECT ProductID, ProductName, ListPrice \
FROM products \
WHERE Category IN ('Mountain Bikes', 'Road Bikes')")
display(bikes_df)
Результаты из примера кода будут выглядеть примерно так:
ProductID | НаименованиеПродукта | ПрейскурантнаяЦена |
---|---|---|
38 | Mountain-100 Silver, 38 | 3399.9900 |
52 | Road-750 Black, 52 | 539.9900 |
... | ... | ... |
Использование кода SQL
В предыдущем примере показано, как использовать API SQL Spark для внедрения выражений SQL в код Spark. В записной книжке можно также использовать магическую команду %%sql
для запуска SQL кода, который запрашивает объекты в каталоге, примерно следующим образом:
%%sql
SELECT Category, COUNT(ProductID) AS ProductCount
FROM products
GROUP BY Category
ORDER BY Category
В примере кода SQL возвращается набор результатов, который автоматически отображается в записной книжке в виде таблицы, как показано ниже.
Категория | ProductCount |
---|---|
Велошорты | 3 |
Багажники для велосипедов | 1 |
Велосипедные стойки | 1 |
... | ... |