API-интерфейсы функций pandas
API pandas функций позволяют напрямую применять собственную функцию Python, которая принимает и выводит экземпляры pandas в DataFrame PySpark. Как и пользовательские функции pandas, API-интерфейсы функций также используют Apache Arrow для передачи данных и pandas для работы с данными; однако указания типов Python являются необязательными в API-интерфейсах функций pandas.
Существует три типа API-интерфейсов функций Pandas:
- Сгруппированная карта
- Карта
- Согруппированная карта
API-интерфейсы функций pandas используют ту же внутреннюю логику, что использует выполнение UDF pandas. Они обладают общими характеристиками, такими как PyArrow, поддерживаемые типы SQL и конфигурации.
Дополнительные сведения см. в записи блога новые пользовательские функции Pandas и подсказки типов Python в предстоящем выпуске Apache Spark 3.0.
Сгруппированная карта
Вы преобразуете сгруппированные данные с помощью groupBy().applyInPandas()
для реализации шаблона split-apply-combine. Процесс "Разделить-применить-объединить" состоит из трех этапов:
- Разделите данные на группы с помощью
DataFrame.groupBy
. - Примените функцию на каждой группе. Входные и выходные данные функции обе имеют значение
pandas.DataFrame
. Входные данные содержат все строки и столбцы для каждой группы. - Объедините результаты в новый
DataFrame
.
Чтобы использовать groupBy().applyInPandas()
, необходимо определить следующее:
- Функция Python, определяющая вычисления для каждой группы
- Объект
StructType
или строка, определяющая схему выходныхDataFrame
Метки столбцов возвращаемого pandas.DataFrame
должны либо совпадать с именами полей в определенной схеме выходных данных, если они указаны в виде строк, либо соответствовать типам данных поля по позиции, если они не являются строками, например, используя целочисленные индексы. См. pandas.DataFrame, как обозначить столбцы при создании pandas.DataFrame
.
Все данные группы загружаются в память перед применением функции. Это может привести к ошибкам нехватки памяти, особенно если размеры группы искажены. Конфигурация для maxRecordsPerBatch не применяется к группам, поэтому вам нужно убедиться, что сгруппированные данные помещаются в доступную память.
В следующем примере показано, как использовать groupby().apply()
для вычитания среднего значения из каждого значения в группе.
df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
("id", "v"))
def subtract_mean(pdf):
# pdf is a pandas.DataFrame
v = pdf.v
return pdf.assign(v=v - v.mean())
df.groupby("id").applyInPandas(subtract_mean, schema="id long, v double").show()
# +---+----+
# | id| v|
# +---+----+
# | 1|-0.5|
# | 1| 0.5|
# | 2|-3.0|
# | 2|-1.0|
# | 2| 4.0|
# +---+----+
Подробные сведения об использовании см. pyspark.sql.GroupedData.applyInPandas.
Карта
Вы выполняете операции отображения с экземплярами pandas с помощью DataFrame.mapInPandas()
для преобразования итератора pandas.DataFrame
в другой итератор pandas.DataFrame
, который представляет текущий кадр данных PySpark и возвращает результат как кадр данных PySpark.
Базовая функция принимает и выводит итератор pandas.DataFrame
. Он может возвращать выходные данные произвольной длины, в отличие от некоторых определяемых пользователем функций (UDF) в pandas, таких как Series to Series.
В следующем примере показано, как использовать mapInPandas()
:
df = spark.createDataFrame([(1, 21), (2, 30)], ("id", "age"))
def filter_func(iterator):
for pdf in iterator:
yield pdf[pdf.id == 1]
df.mapInPandas(filter_func, schema=df.schema).show()
# +---+---+
# | id|age|
# +---+---+
# | 1| 21|
# +---+---+
Подробности использования см. в разделе pyspark.sql.DataFrame.mapInPandas.
Согруппированная карта
Для совместно группированных операций сопоставления с экземплярами pandas используйте DataFrame.groupby().cogroup().applyInPandas()
для когруппирования двух PySpark DataFrame
по общему ключу, а затем примените функцию Python к каждой когруппе.
- Перемешайте данные таким образом, чтобы группы каждого фрейма данных, которые используют общий ключ, были сгруппированы вместе.
- Примените функцию к каждой согруппе. Входными параметрами функции являются две
pandas.DataFrame
(с необязательным кортежем, представляющим ключ). Выходные данные функции — этоpandas.DataFrame
. - Объедините
pandas.DataFrame
из всех групп в новый PySparkDataFrame
.
Чтобы использовать groupBy().cogroup().applyInPandas()
, необходимо определить следующее:
- Функция Python, определяющая вычисления для каждой согруппы.
- Объект
StructType
или строка, определяющая схему выходных данных для PySparkDataFrame
.
Метки столбцов возвращаемого pandas.DataFrame
должны совпадать с названиями полей в определенной схеме выходных данных, если они указаны в виде строк, или соответствовать типам данных полей по их позиции, если это не строки, например, целочисленные индексы. См. pandas.DataFrame, чтобы узнать, как пометить столбцы при создании pandas.DataFrame
.
Все данные для согруппы загружаются в память перед применением функции. Это может привести к ошибкам нехватки памяти, особенно если размеры группы искажены. Конфигурация maxRecordsPerBatch не применяется, поэтому вам нужно убедиться, что косгруппированные данные помещаются в доступную память.
В следующем примере показано, как использовать groupby().cogroup().applyInPandas()
для выполнения asof join
между двумя наборами данных.
import pandas as pd
df1 = spark.createDataFrame(
[(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
("time", "id", "v1"))
df2 = spark.createDataFrame(
[(20000101, 1, "x"), (20000101, 2, "y")],
("time", "id", "v2"))
def asof_join(l, r):
return pd.merge_asof(l, r, on="time", by="id")
df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
asof_join, schema="time int, id int, v1 double, v2 string").show()
# +--------+---+---+---+
# | time| id| v1| v2|
# +--------+---+---+---+
# |20000101| 1|1.0| x|
# |20000102| 1|3.0| x|
# |20000101| 2|2.0| y|
# |20000102| 2|4.0| y|
# +--------+---+---+---+
Подробные сведения по использованию см. в разделе pyspark.sql.PandasCogroupedOps.applyInPandas.