Поделиться через


Основы PySpark

В этой статье рассматриваются простые примеры использования PySpark. Предполагается, что вы понимаете основные понятия Apache Spark и выполняете команды в записной книжке Azure Databricks, подключенной к вычислениям. Вы создаете кадры данных с помощью примеров данных, выполняете основные преобразования, включая операции строк и столбцов с данными, объединяете несколько кадров данных и объединяете эти данные, визуализируете эти данные, а затем сохраняете их в таблице или файле.

Отправка данных

В некоторых примерах в этой статье используются примеры данных, предоставленные Databricks, для демонстрации загрузки, преобразования и сохранения данных с помощью кадров данных. Если вы хотите использовать собственные данные, которые еще не в Databricks, вы можете сначала отправить его и создать кадр данных из него. См. статью "Создание или изменение таблицы с помощью отправки и отправки файлов в том каталога Unity".

Сведения о примерах данных Databricks

Databricks предоставляет примеры данных в каталоге samples и в каталоге /databricks-datasets .

  • Чтобы получить доступ к примеру данных в каталоге samples , используйте формат samples.<schema-name>.<table-name>. В этой статье используются таблицы схемы samples.tpch , содержащие данные из вымышленного бизнеса. Таблица customer содержит сведения о клиентах и orders содержит сведения о заказах, размещенных этими клиентами.
  • Использование dbutils.fs.ls для изучения данных в /databricks-datasets. Используйте Spark SQL или DataFrames для запроса данных в этом расположении с помощью путей к файлам. Дополнительные сведения о примерах данных, предоставленных Databricks, см. в разделе "Примеры наборов данных".

Импорт типов данных

Многие операции PySpark требуют использования функций SQL или взаимодействия с собственными типами Spark. Вы можете импортировать только те функции и типы, которые вам нужны, или импортировать весь модуль.

# import all
from pyspark.sql.types import *
from pyspark.sql.functions import *

# import select functions and types
from pyspark.sql.types import IntegerType, StringType
from pyspark.sql.functions import floor, round

Так как некоторые импортированные функции могут переопределить встроенные функции Python, некоторые пользователи предпочитают импортировать эти модули с помощью псевдонима. В следующих примерах показан распространенный псевдоним, используемый в примерах кода Apache Spark:

import pyspark.sql.types as T
import pyspark.sql.functions as F

Полный список типов данных см. в разделе "Типы данных Spark".

Полный список функций PySpark SQL см. в разделе "Функции Spark".

Создание кадра данных

Существует несколько способов создания кадра данных. Обычно вы определяете кадр данных для источника данных, например таблицы или коллекции файлов. Затем, как описано в разделе основных понятий Apache Spark, используйте действие, напримерdisplay, для активации преобразований для выполнения. Метод display выводит кадры данных.

Создание кадра данных с указанными значениями

Чтобы создать кадр данных с указанными значениями, используйте createDataFrame метод, в котором строки выражаются в виде списка кортежей:

df_children = spark.createDataFrame(
  data = [("Mikhail", 15), ("Zaky", 13), ("Zoya", 8)],
  schema = ['name', 'age'])
display(df_children)

Обратите внимание, что типы данных столбцов df_children автоматически выводятся. Кроме того, можно указать типы, добавив схему. Схемы определяются с помощью StructType того, что определяет StructFields имя, тип данных и логический флаг, указывающий, содержит ли они значение NULL или нет. Необходимо импортировать типы данных из pyspark.sql.types.

from pyspark.sql.types import StructType, StructField, StringType, IntegerType

df_children_with_schema = spark.createDataFrame(
  data = [("Mikhail", 15), ("Zaky", 13), ("Zoya", 8)],
  schema = StructType([
    StructField('name', StringType(), True),
    StructField('age', IntegerType(), True)
  ])
)
display(df_children_with_schema)

Создание кадра данных из таблицы в каталоге Unity

Чтобы создать кадр данных из таблицы в каталоге Unity, используйте table метод, определяющий таблицу с помощью формата <catalog-name>.<schema-name>.<table-name>. Щелкните каталог на левой панели навигации, чтобы использовать обозреватель каталогов для перехода к таблице. Щелкните его, а затем выберите путь к таблице, чтобы вставить путь к таблице в записную книжку.

В следующем примере загружается таблица samples.tpch.customer, но можно также указать путь к собственной таблице.

df_customer = spark.table('samples.tpch.customer')
display(df_customer)

Создание кадра данных из отправленного файла

Чтобы создать кадр данных из файла, отправленного в тома каталога Unity, используйте read это свойство. Этот метод возвращает значение DataFrameReader, которое можно использовать для чтения соответствующего формата. Щелкните параметр каталога на небольшой боковой панели слева и используйте браузер каталога для поиска файла. Выберите его, а затем нажмите кнопку "Копировать путь к файлу тома".

В приведенном *.csv ниже примере считывается из файла, но DataFrameReader поддерживается отправка файлов во многих других форматах. См . методы DataFrameReader.

# Assign this variable your full volume file path
volume_file_path = ""

df_csv = (spark.read
  .format("csv")
  .option("header", True)
  .option("inferSchema", True)
  .load(volume_file_path)
)
display(df_csv)

Дополнительные сведения о томах каталога Unity см. в разделе "Что такое тома каталога Unity?".

Создание кадра данных из ответа JSON

Чтобы создать кадр данных из полезных данных ответа JSON, возвращаемых REST API, используйте пакет Python requests для запроса и анализа ответа. Необходимо импортировать пакет для его использования. В этом примере используются данные из базы данных приложений США по контролю за наркотиками для управления продуктами и наркотиками.

import requests

# Download data from URL
url = "https://api.fda.gov/drug/drugsfda.json?limit=100"
response = requests.get(url)

# Create the DataFrame
df_drugs = spark.createDataFrame(response.json()["results"])
display(df_drugs)

Сведения о работе с JSON и другими частично структурированными данными в Databricks см. в разделе "Модель полуструктурированных данных".

Выбор поля или объекта JSON

Чтобы выбрать определенное поле или объект из преобразованного JSON, используйте [] нотацию. Например, чтобы выбрать products поле, которое является массивом продуктов:

display(df_drugs.select(df_drugs["products"]))

Можно также объединить вызовы методов для обхода нескольких полей. Например, чтобы вывести название торговой марки первого продукта в приложении наркотиков:

display(df_drugs.select(df_drugs["products"][0]["brand_name"]))

Создание кадра данных из файла

Чтобы продемонстрировать создание кадра данных из файла, в этом примере загружаются данные CSV в /databricks-datasets каталоге.

Чтобы перейти к примерам наборов данных, можно использовать команды файловой системы Databricks Utilties . Следующий пример используется dbutils для перечисления наборов данных, доступных в /databricks-datasets:

display(dbutils.fs.ls('/databricks-datasets'))

Кроме того, можно использовать %fs для доступа к командам файловой системы Databricks CLI, как показано в следующем примере:

%fs ls '/databricks-datasets'

Чтобы создать кадр данных из файла или каталога файлов, укажите путь в методе load :

df_population = (spark.read
  .format("csv")
  .option("header", True)
  .option("inferSchema", True)
  .load("/databricks-datasets/samples/population-vs-price/data_geo.csv")
)
display(df_population)

Преобразование данных с помощью кадров данных

Кадры данных упрощают преобразование данных с помощью встроенных методов для сортировки, фильтрации и статистической обработки данных. Многие преобразования не указываются как методы в кадрах данных, но вместо этого предоставляются в пакете spark.sql.functions . См. сведения о функциях SQL Databricks Spark.

Операции столбцов

Spark предоставляет множество основных операций столбцов:

Совет

Чтобы вывести все столбцы в кадре данных, используйте columns, например df_customer.columns.

Выбор столбцов

Вы можете выбрать определенные столбцы с помощью select и col. Функция col находится в подмодуле pyspark.sql.functions .

from pyspark.sql.functions import col

df_customer.select(
  col("c_custkey"),
  col("c_acctbal")
)

Можно также ссылаться на столбец, expr который принимает выражение, определенное как строка:

from pyspark.sql.functions import expr

df_customer.select(
  expr("c_custkey"),
  expr("c_acctbal")
)

Вы также можете использовать selectExpr, который принимает выражения SQL:

df_customer.selectExpr(
  "c_custkey as key",
  "round(c_acctbal) as account_rounded"
)

Чтобы выбрать столбцы с помощью строкового литерала, сделайте следующее:

df_customer.select(
  "c_custkey",
  "c_acctbal"
)

Чтобы явно выбрать столбец из определенного кадра данных, можно использовать [] оператор или . оператор. (Оператор . не может использоваться для выбора столбцов, начиная с целого числа, или тех, которые содержат пробел или специальный символ.) Это может быть особенно полезно при присоединении к кадрам данных, где некоторые столбцы имеют то же имя.

df_customer.select(
  df_customer["c_custkey"],
  df_customer["c_acctbal"]
)
df_customer.select(
  df_customer.c_custkey,
  df_customer.c_acctbal
)

Создание столбцов

Чтобы создать новый столбец, используйте withColumn метод. В следующем примере создается новый столбец, содержащий логическое значение на основе превышения 1000баланса c_acctbal учетной записи клиента:

df_customer_flag = df_customer.withColumn("balance_flag", col("c_acctbal") > 1000)

Переименование столбцов

Чтобы переименовать столбец, используйте withColumnRenamed метод, который принимает существующие и новые имена столбцов:

df_customer_flag_renamed = df_customer_flag.withColumnRenamed("balance_flag", "balance_flag_renamed")

Этот alias метод особенно полезен, если вы хотите переименовать столбцы в составе агрегатов:

from pyspark.sql.functions import avg

df_segment_balance = df_customer.groupBy("c_mktsegment").agg(
    avg(df_customer["c_acctbal"]).alias("avg_account_balance")
)

display(df_segment_balance)

Типы столбцов приведения

В некоторых случаях может потребоваться изменить тип данных для одного или нескольких столбцов в кадре данных. Для этого используйте cast метод для преобразования между типами данных столбцов. В следующем примере показано, как преобразовать столбец из целого числа в тип строки, используя col метод для ссылки на столбец:

from pyspark.sql.functions import col

df_casted = df_customer.withColumn("c_custkey", col("c_custkey").cast(StringType()))
print(type(df_casted))

Удаление столбцов

Чтобы удалить столбцы, можно опустить столбцы во время выбора или select(*) except использовать drop метод:

df_customer_flag_renamed.drop("balance_flag_renamed")

Вы также можете одновременно удалить несколько столбцов:

df_customer_flag_renamed.drop("c_phone", "balance_flag_renamed")

Операции строк

Spark предоставляет множество основных операций строк:

Фильтровать строки

Чтобы отфильтровать строки, используйте filter метод или where метод в кадре данных, чтобы возвращать только определенные строки. Чтобы определить столбец для фильтрации, используйте col метод или выражение, которое вычисляет столбец.

from pyspark.sql.functions import col

df_that_one_customer = df_customer.filter(col("c_custkey") == 412449)

Чтобы отфильтровать несколько условий, используйте логические операторы. Например, & и | разрешить вам AND и OR условия соответственно. В следующем примере выполняется фильтрация строк, в которых c_nationkey значение равно 20 и c_acctbal больше 1000.

df_customer.filter((col("c_nationkey") == 20) & (col("c_acctbal") > 1000))
df_filtered_customer = df_customer.filter((col("c_custkey") == 412446) | (col("c_custkey") == 412447))

Удаление повторяющихся строк

Для отмены повторяющихся строк используйте distinctэтот параметр, который возвращает только уникальные строки.

df_unique = df_customer.distinct()

Обработка значений NULL

Чтобы обрабатывать значения NULL, удалите строки, содержащие значения NULL с помощью na.drop метода. Этот метод позволяет указать, нужно ли удалять строки, any содержащие значения NULL или all значения NULL.

Чтобы удалить значения NULL, используйте любой из следующих примеров.

df_customer_no_nulls = df_customer.na.drop()
df_customer_no_nulls = df_customer.na.drop("any")

Если вместо этого требуется отфильтровать только строки, содержащие все значения NULL, используйте следующее:

df_customer_no_nulls = df_customer.na.drop("all")

Это можно применить для подмножества столбцов, указав это, как показано ниже:

df_customer_no_nulls = df_customer.na.drop("all", subset=["c_acctbal", "c_custkey"])

Чтобы заполнить отсутствующие значения, используйте fill метод. Вы можете применить это ко всем столбцам или подмножества столбцов. В приведенном ниже примере балансы учетных записей с пустым значением для баланса c_acctbal учетной записи заполняются 0.

df_customer_filled = df_customer.na.fill("0", subset=["c_acctbal"])

Чтобы заменить строки другими значениями, используйте replace этот метод. В приведенном ниже примере все пустые строки адресов заменяются словом UNKNOWN:

df_customer_phone_filled = df_customer.na.replace([""], ["UNKNOWN"], subset=["c_phone"])

Добавление строк

Чтобы добавить строки, необходимо использовать union метод для создания нового кадра данных. В следующем примере кадр данных, созданный ранее и df_filtered_customer объединенный, возвращает кадр данных df_that_one_customer с тремя клиентами:

df_appended_rows = df_that_one_customer.union(df_filtered_customer)

display(df_appended_rows)

Примечание.

Вы также можете объединить кадры данных, записав их в таблицу, а затем добавив новые строки. Для рабочих нагрузок добавочная обработка источников данных в целевую таблицу может значительно сократить задержку и затраты на вычисления по мере увеличения размера данных. Ознакомьтесь с данными в Databricks lakehouse.

Сортировать строки

Внимание

Сортировка может быть дорогой в масштабе, и если вы храните отсортированные данные и перезагрузите данные с помощью Spark, порядок не гарантируется. Убедитесь, что вы намеренно используете сортировку.

Чтобы отсортировать строки по одному или нескольким столбцам, используйте sort или orderBy метод. По умолчанию эти методы сортируются по возрастанию:

df_customer.orderBy(col("c_acctbal"))

Чтобы отфильтровать по убыванию, используйте desc:

df_customer.sort(col("c_custkey").desc())

В следующем примере показано, как сортировать по двум столбцам:

df_sorted = df_customer.orderBy(col("c_acctbal").desc(), col("c_custkey").asc())
df_sorted = df_customer.sort(col("c_acctbal").desc(), col("c_custkey").asc())

Чтобы ограничить количество строк, возвращаемых после сортировки кадра данных, используйте limit этот метод. В следующем примере отображаются только первые 10 результаты:

display(df_sorted.limit(10))

Присоединение к кадрам данных

Чтобы присоединиться к двум или нескольким кадрам данных, используйте join этот метод. Вы можете указать, как нужно, чтобы кадры данных были присоединены к how (типу соединения) и on (на каких столбцах следует основывать параметры соединения). Распространенные типы соединения включают:

  • inner: это тип соединения по умолчанию, который возвращает кадр данных, который сохраняет только строки, в которых есть совпадение для on параметра в кадрах данных.
  • left: это сохраняет все строки первого указанного кадра данных и только строки из второго указанного кадра данных, которые имеют совпадение с первым.
  • outer: внешнее соединение сохраняет все строки из обоих кадров данных независимо от соответствия.

Подробные сведения о соединениях см. в статье "Работа с соединениями в Azure Databricks". Список соединений, поддерживаемых в PySpark, см. в разделе "Соединения с кадрами данных".

В следующем примере возвращается один кадр данных, в котором каждая строка кадра orders данных присоединена к соответствующей строке из customers кадра данных. Используется внутреннее соединение, так как ожидается, что каждый заказ соответствует ровно одному клиенту.

df_customer = spark.table('samples.tpch.customer')
df_order = spark.table('samples.tpch.orders')

df_joined = df_order.join(
  df_customer,
  on = df_order["o_custkey"] == df_customer["c_custkey"],
  how = "inner"
)

display(df_joined)

Чтобы присоединиться к нескольким условиям, используйте логические операторы, такие как & и | для указания AND и ORсоответственно. В следующем примере добавляется дополнительное условие, фильтрация только к строкам, имеющим o_totalprice больше 500,000:

df_customer = spark.table('samples.tpch.customer')
df_order = spark.table('samples.tpch.orders')

df_complex_joined = df_order.join(
  df_customer,
  on = ((df_order["o_custkey"] == df_customer["c_custkey"]) & (df_order["o_totalprice"] > 500000)),
  how = "inner"
)

display(df_complex_joined)

Статистические данные

Чтобы агрегировать данные в кадре данных, аналогичном GROUP BY sql, используйте groupBy метод, чтобы указать столбцы для группировки по и agg методу, чтобы указать агрегаты. Импорт общих агрегатов, включая avg, summaxи min из pyspark.sql.functions. В следующем примере показан средний баланс клиентов по сегменту рынка:

from pyspark.sql.functions import avg

# group by one column
df_segment_balance = df_customer.groupBy("c_mktsegment").agg(
    avg(df_customer["c_acctbal"])
)

display(df_segment_balance)
from pyspark.sql.functions import avg

# group by two columns
df_segment_nation_balance = df_customer.groupBy("c_mktsegment", "c_nationkey").agg(
    avg(df_customer["c_acctbal"])
)

display(df_segment_nation_balance)

Некоторые агрегаты — это действия, что означает, что они активируют вычисления. В этом случае вам не нужно использовать другие действия для вывода результатов.

Чтобы подсчитать строки в кадре данных, используйте count метод:

df_customer.count()

Вызовы цепочки

Методы, преобразующие кадры данных, возвращают кадры данных, и Spark не действует на преобразования, пока не будут вызваны действия. Эта отложенная оценка означает, что можно прицепить несколько методов для удобства и удобочитаемости. В следующем примере показано, как выполнять фильтрацию, агрегирование и упорядочение:

from pyspark.sql.functions import count

df_chained = (
    df_order.filter(col("o_orderstatus") == "F")
    .groupBy(col("o_orderpriority"))
    .agg(count(col("o_orderkey")).alias("n_orders"))
    .sort(col("n_orders").desc())
)

display(df_chained)

Визуализация кадра данных

Чтобы визуализировать кадр данных в записной книжке, щелкните + знак рядом с таблицей слева от кадра данных, а затем выберите визуализацию , чтобы добавить одну или несколько диаграмм на основе кадра данных. Дополнительные сведения о визуализациях см. в разделе "Визуализации" в записных книжках Databricks.

display(df_order)

Для выполнения дополнительных визуализаций Databricks рекомендует использовать API pandas для Spark. Позволяет .pandas_api() приведения к соответствующему API pandas для кадра данных Spark. Дополнительные сведения см. в разделе API Pandas в Spark.

Сохранение данных

После преобразования данных его можно сохранить с помощью DataFrameWriter методов. Полный список этих методов можно найти в DataFrameWriter. В следующих разделах показано, как сохранить кадр данных в виде таблицы и в виде коллекции файлов данных.

Сохранение кадра данных в виде таблицы

Чтобы сохранить кадр данных в качестве таблицы в каталоге Unity, используйте write.saveAsTable метод и укажите путь в формате <catalog-name>.<schema-name>.<table-name>.

df_joined.write.saveAsTable(f"{catalog_name}.{schema_name}.{table_name}")

Запись кадра данных в формате CSV

Чтобы записать кадр данных *.csv в формат, используйте write.csv этот метод, указав формат и параметры. По умолчанию, если данные существуют по указанному пути, операция записи завершается ошибкой. Чтобы выполнить другое действие, можно указать один из следующих режимов:

  • overwrite перезаписывает все существующие данные в целевом пути с содержимым кадра данных.
  • append добавляет содержимое кадра данных к данным в целевом пути.
  • ignore Автоматически завершается сбоем записи, если данные существуют в целевом пути.

В следующем примере показано перезапись данных с содержимым кадра данных в виде CSV-файлов:

# Assign this variable your file path
file_path = ""

(df_joined.write
  .format("csv")
  .mode("overwrite")
  .write(file_path)
)

Следующие шаги

Дополнительные возможности Spark в Databricks см. в следующем разделе: