Поделиться через


API-интерфейсы функций pandas

API pandas функций позволяют напрямую применять собственную функцию Python, которая принимает и выводит экземпляры pandas в DataFrame PySpark. Как и пользовательские функции pandas, API-интерфейсы функций также используют Apache Arrow для передачи данных и pandas для работы с данными; однако указания типов Python являются необязательными в API-интерфейсах функций pandas.

Существует три типа API-интерфейсов функций Pandas:

  • Сгруппированная карта
  • Карта
  • Согруппированная карта

API-интерфейсы функций pandas используют ту же внутреннюю логику, что использует выполнение UDF pandas. Они обладают общими характеристиками, такими как PyArrow, поддерживаемые типы SQL и конфигурации.

Дополнительные сведения см. в записи блога новые пользовательские функции Pandas и подсказки типов Python в предстоящем выпуске Apache Spark 3.0.

Сгруппированная карта

Вы преобразуете сгруппированные данные с помощью groupBy().applyInPandas() для реализации шаблона split-apply-combine. Процесс "Разделить-применить-объединить" состоит из трех этапов:

  • Разделите данные на группы с помощью DataFrame.groupBy.
  • Примените функцию на каждой группе. Входные и выходные данные функции обе имеют значение pandas.DataFrame. Входные данные содержат все строки и столбцы для каждой группы.
  • Объедините результаты в новый DataFrame.

Чтобы использовать groupBy().applyInPandas(), необходимо определить следующее:

  • Функция Python, определяющая вычисления для каждой группы
  • Объект StructType или строка, определяющая схему выходных DataFrame

Метки столбцов возвращаемого pandas.DataFrame должны либо совпадать с именами полей в определенной схеме выходных данных, если они указаны в виде строк, либо соответствовать типам данных поля по позиции, если они не являются строками, например, используя целочисленные индексы. См. pandas.DataFrame, как обозначить столбцы при создании pandas.DataFrame.

Все данные группы загружаются в память перед применением функции. Это может привести к ошибкам нехватки памяти, особенно если размеры группы искажены. Конфигурация для maxRecordsPerBatch не применяется к группам, поэтому вам нужно убедиться, что сгруппированные данные помещаются в доступную память.

В следующем примере показано, как использовать groupby().apply() для вычитания среднего значения из каждого значения в группе.

df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    ("id", "v"))

def subtract_mean(pdf):
    # pdf is a pandas.DataFrame
    v = pdf.v
    return pdf.assign(v=v - v.mean())

df.groupby("id").applyInPandas(subtract_mean, schema="id long, v double").show()
# +---+----+
# | id|   v|
# +---+----+
# |  1|-0.5|
# |  1| 0.5|
# |  2|-3.0|
# |  2|-1.0|
# |  2| 4.0|
# +---+----+

Подробные сведения об использовании см. pyspark.sql.GroupedData.applyInPandas.

Карта

Вы выполняете операции отображения с экземплярами pandas с помощью DataFrame.mapInPandas() для преобразования итератора pandas.DataFrame в другой итератор pandas.DataFrame, который представляет текущий кадр данных PySpark и возвращает результат как кадр данных PySpark.

Базовая функция принимает и выводит итератор pandas.DataFrame. Он может возвращать выходные данные произвольной длины, в отличие от некоторых определяемых пользователем функций (UDF) в pandas, таких как Series to Series.

В следующем примере показано, как использовать mapInPandas():

df = spark.createDataFrame([(1, 21), (2, 30)], ("id", "age"))

def filter_func(iterator):
    for pdf in iterator:
        yield pdf[pdf.id == 1]

df.mapInPandas(filter_func, schema=df.schema).show()
# +---+---+
# | id|age|
# +---+---+
# |  1| 21|
# +---+---+

Подробности использования см. в разделе pyspark.sql.DataFrame.mapInPandas.

Согруппированная карта

Для совместно группированных операций сопоставления с экземплярами pandas используйте DataFrame.groupby().cogroup().applyInPandas() для когруппирования двух PySpark DataFrameпо общему ключу, а затем примените функцию Python к каждой когруппе.

  • Перемешайте данные таким образом, чтобы группы каждого фрейма данных, которые используют общий ключ, были сгруппированы вместе.
  • Примените функцию к каждой согруппе. Входными параметрами функции являются две pandas.DataFrame (с необязательным кортежем, представляющим ключ). Выходные данные функции — это pandas.DataFrame.
  • Объедините pandas.DataFrameиз всех групп в новый PySpark DataFrame.

Чтобы использовать groupBy().cogroup().applyInPandas(), необходимо определить следующее:

  • Функция Python, определяющая вычисления для каждой согруппы.
  • Объект StructType или строка, определяющая схему выходных данных для PySpark DataFrame.

Метки столбцов возвращаемого pandas.DataFrame должны совпадать с названиями полей в определенной схеме выходных данных, если они указаны в виде строк, или соответствовать типам данных полей по их позиции, если это не строки, например, целочисленные индексы. См. pandas.DataFrame, чтобы узнать, как пометить столбцы при создании pandas.DataFrame.

Все данные для согруппы загружаются в память перед применением функции. Это может привести к ошибкам нехватки памяти, особенно если размеры группы искажены. Конфигурация maxRecordsPerBatch не применяется, поэтому вам нужно убедиться, что косгруппированные данные помещаются в доступную память.

В следующем примере показано, как использовать groupby().cogroup().applyInPandas() для выполнения asof join между двумя наборами данных.

import pandas as pd

df1 = spark.createDataFrame(
    [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
    ("time", "id", "v1"))

df2 = spark.createDataFrame(
    [(20000101, 1, "x"), (20000101, 2, "y")],
    ("time", "id", "v2"))

def asof_join(l, r):
    return pd.merge_asof(l, r, on="time", by="id")

df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
    asof_join, schema="time int, id int, v1 double, v2 string").show()
# +--------+---+---+---+
# |    time| id| v1| v2|
# +--------+---+---+---+
# |20000101|  1|1.0|  x|
# |20000102|  1|3.0|  x|
# |20000101|  2|2.0|  y|
# |20000102|  2|4.0|  y|
# +--------+---+---+---+

Подробные сведения по использованию см. в разделе pyspark.sql.PandasCogroupedOps.applyInPandas.