Compartir a través de


API de función de Pandas

Las API de funciones de Pandas permiten aplicar directamente una función nativa de Python, que toma y genera instancias de Pandas a un elemento DataFrame de PySpark. De forma similar a las funciones definidas por el usuario de Pandas, las API de funciones también usan Apache Arrow para transferir datos y Pandas para trabajar con ellos; pero, las sugerencias de tipo de Python son opcionales en las API de funciones de Pandas.

Hay tres tipos de API de funciones de Pandas:

  • Mapa agrupado
  • Map
  • Mapa coagrupado

Las API de funciones de Pandas sacan provecho de la misma lógica interna que usan las ejecuciones de UDF de Pandas. Comparten características como PyArrow, tipos de SQL admitidos y las configuraciones.

Para más información, vea la entrada de blog New Pandas UDF and Python Type Hints in the Upcoming Release of Apache Spark 3.0 (Nuevas UDF de Pandas y sugerencias de tipo de Python en la próxima versión de Apache Spark 3.0).

Mapa agrupado

Los datos agrupados se transforman mediante groupBy().applyInPandas() para implementar el patrón "dividir-aplicar-combinar". Este patrón consta de tres pasos:

  • Los datos se dividen en grupos mediante DataFrame.groupBy.
  • Se aplica una función en cada grupo. La entrada y la salida de la función son pandas.DataFrame. Los datos de entrada contienen todas las filas y columnas de cada grupo.
  • Los resultados se combinan en un nuevo objeto DataFrame.

Para usar groupBy().applyInPandas(), tendrá que definir lo siguiente:

  • Una función de Python que defina el cálculo para cada grupo
  • Un objeto StructType o una cadena que defina el esquema de la salida DataFrame

Las etiquetas de columna del elemento pandas.DataFrame devuelto deben coincidir con los nombres de campo del esquema de salida definido si se especifican como cadenas, o bien con los tipos de datos de campo por posición si no son cadenas, por ejemplo, índices enteros. Vea pandas.DataFrame para saber cómo etiquetar columnas al construir un objeto pandas.DataFrame.

Todos los datos de un grupo se cargan en memoria antes de aplicar la función. Esto puede provocar excepciones de memoria insuficiente, especialmente si los tamaños de grupo están sesgados. La configuración de maxRecordsPerBatch no se aplica a los grupos y tendrá que asegurarse de que los datos agrupados se ajusten a la memoria disponible.

En el ejemplo siguiente se muestra cómo usar groupby().apply() para restar la media de cada valor del grupo.

df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    ("id", "v"))

def subtract_mean(pdf):
    # pdf is a pandas.DataFrame
    v = pdf.v
    return pdf.assign(v=v - v.mean())

df.groupby("id").applyInPandas(subtract_mean, schema="id long, v double").show()
# +---+----+
# | id|   v|
# +---+----+
# |  1|-0.5|
# |  1| 0.5|
# |  2|-3.0|
# |  2|-1.0|
# |  2| 4.0|
# +---+----+

Para obtener el uso detallado, vea pyspark.sql.GroupedData.applyInPandas.

Map

Las operaciones de asignación con instancias de Pandas se realizan mediante DataFrame.mapInPandas() para transformar un iterador de pandas.DataFrame en otro iterador de pandas.DataFrame que representa el objeto DataFrame de PySpark actual y devuelve el resultado como un objeto DataFrame de PySpark.

La función subyacente toma y genera un iterador de pandas.DataFrame. Puede devolver la salida de longitud arbitraria, a diferencia de algunos UDF de Pandas, como los de Series a Series.

En el siguiente ejemplo se muestra cómo usar el método mapInPandas():

df = spark.createDataFrame([(1, 21), (2, 30)], ("id", "age"))

def filter_func(iterator):
    for pdf in iterator:
        yield pdf[pdf.id == 1]

df.mapInPandas(filter_func, schema=df.schema).show()
# +---+---+
# | id|age|
# +---+---+
# |  1| 21|
# +---+---+

Para obtener el uso detallado, vea pyspark.sql.DataFrame.mapInPandas.

Mapa coagrupado

En el caso de las operaciones de mapa coagrupado con instancias de Pandas, use DataFrame.groupby().cogroup().applyInPandas() para que dos objetos DataFrame de PySpark se agrupen mediante una clave común y, después, aplique una función de Python a cada grupo, como se muestra aquí:

  • Aplique un orden aleatorio a los datos de forma que los grupos de cada objeto DataFrame que comparten una clave se agrupen de manera conjunta.
  • Aplique una función a cada grupo. La entrada de la función son dos elementos pandas.DataFrame (con una tupla opcional que representa la clave). La salida de la función es un elemento pandas.DataFrame.
  • Combine los objetos pandas.DataFrame de todos los grupos en un nuevo objeto DataFrame de PySpark.

Para usar groupBy().cogroup().applyInPandas(), tendrá que definir lo siguiente:

  • Una función de Python que defina el cálculo para cada grupo.
  • Un objeto StructType o una cadena que defina el esquema del objeto DataFrame de PySpark de salida.

Las etiquetas de columna del elemento pandas.DataFrame devuelto deben coincidir con los nombres de campo del esquema de salida definido si se especifican como cadenas, o bien con los tipos de datos de campo por posición si no son cadenas, por ejemplo, índices enteros. Vea pandas.DataFrame para saber cómo etiquetar columnas al construir un objeto pandas.DataFrame.

Todos los datos de un grupo conjunto se cargan en memoria antes de aplicar la función. Esto puede provocar excepciones de memoria insuficiente, especialmente si los tamaños de grupo están sesgados. La configuración de maxRecordsPerBatch no se aplica y tendrá que asegurarse de que los datos agrupados se ajusten a la memoria disponible.

En el ejemplo siguiente se muestra cómo usar groupby().cogroup().applyInPandas() para realizar una operación asof join entre dos conjuntos de datos.

import pandas as pd

df1 = spark.createDataFrame(
    [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
    ("time", "id", "v1"))

df2 = spark.createDataFrame(
    [(20000101, 1, "x"), (20000101, 2, "y")],
    ("time", "id", "v2"))

def asof_join(l, r):
    return pd.merge_asof(l, r, on="time", by="id")

df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
    asof_join, schema="time int, id int, v1 double, v2 string").show()
# +--------+---+---+---+
# |    time| id| v1| v2|
# +--------+---+---+---+
# |20000101|  1|1.0|  x|
# |20000102|  1|3.0|  x|
# |20000101|  2|2.0|  y|
# |20000102|  2|4.0|  y|
# +--------+---+---+---+

Para obtener el uso detallado, vea pyspark.sql.PandasCogroupedOps.applyInPandas.