API de función de Pandas
Las API de funciones de Pandas permiten aplicar directamente una función nativa de Python, que toma y genera instancias de Pandas a un elemento DataFrame de PySpark. De forma similar a las funciones definidas por el usuario de Pandas, las API de funciones también usan Apache Arrow para transferir datos y Pandas para trabajar con ellos; pero, las sugerencias de tipo de Python son opcionales en las API de funciones de Pandas.
Hay tres tipos de API de funciones de Pandas:
- Mapa agrupado
- Map
- Mapa coagrupado
Las API de funciones de Pandas sacan provecho de la misma lógica interna que usan las ejecuciones de UDF de Pandas. Comparten características como PyArrow, tipos de SQL admitidos y las configuraciones.
Para más información, vea la entrada de blog New Pandas UDF and Python Type Hints in the Upcoming Release of Apache Spark 3.0 (Nuevas UDF de Pandas y sugerencias de tipo de Python en la próxima versión de Apache Spark 3.0).
Mapa agrupado
Los datos agrupados se transforman mediante groupBy().applyInPandas()
para implementar el patrón "dividir-aplicar-combinar". Este patrón consta de tres pasos:
- Los datos se dividen en grupos mediante
DataFrame.groupBy
. - Se aplica una función en cada grupo. La entrada y la salida de la función son
pandas.DataFrame
. Los datos de entrada contienen todas las filas y columnas de cada grupo. - Los resultados se combinan en un nuevo objeto
DataFrame
.
Para usar groupBy().applyInPandas()
, tendrá que definir lo siguiente:
- Una función de Python que defina el cálculo para cada grupo
- Un objeto
StructType
o una cadena que defina el esquema de la salidaDataFrame
Las etiquetas de columna del elemento pandas.DataFrame
devuelto deben coincidir con los nombres de campo del esquema de salida definido si se especifican como cadenas, o bien con los tipos de datos de campo por posición si no son cadenas, por ejemplo, índices enteros. Vea pandas.DataFrame para saber cómo etiquetar columnas al construir un objeto pandas.DataFrame
.
Todos los datos de un grupo se cargan en memoria antes de aplicar la función. Esto puede provocar excepciones de memoria insuficiente, especialmente si los tamaños de grupo están sesgados. La configuración de maxRecordsPerBatch no se aplica a los grupos y tendrá que asegurarse de que los datos agrupados se ajusten a la memoria disponible.
En el ejemplo siguiente se muestra cómo usar groupby().apply()
para restar la media de cada valor del grupo.
df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
("id", "v"))
def subtract_mean(pdf):
# pdf is a pandas.DataFrame
v = pdf.v
return pdf.assign(v=v - v.mean())
df.groupby("id").applyInPandas(subtract_mean, schema="id long, v double").show()
# +---+----+
# | id| v|
# +---+----+
# | 1|-0.5|
# | 1| 0.5|
# | 2|-3.0|
# | 2|-1.0|
# | 2| 4.0|
# +---+----+
Para obtener el uso detallado, vea pyspark.sql.GroupedData.applyInPandas.
Map
Las operaciones de asignación con instancias de Pandas se realizan mediante DataFrame.mapInPandas()
para transformar un iterador de pandas.DataFrame
en otro iterador de pandas.DataFrame
que representa el objeto DataFrame de PySpark actual y devuelve el resultado como un objeto DataFrame de PySpark.
La función subyacente toma y genera un iterador de pandas.DataFrame
. Puede devolver la salida de longitud arbitraria, a diferencia de algunos UDF de Pandas, como los de Series a Series.
En el siguiente ejemplo se muestra cómo usar el método mapInPandas()
:
df = spark.createDataFrame([(1, 21), (2, 30)], ("id", "age"))
def filter_func(iterator):
for pdf in iterator:
yield pdf[pdf.id == 1]
df.mapInPandas(filter_func, schema=df.schema).show()
# +---+---+
# | id|age|
# +---+---+
# | 1| 21|
# +---+---+
Para obtener el uso detallado, vea pyspark.sql.DataFrame.mapInPandas.
Mapa coagrupado
En el caso de las operaciones de mapa coagrupado con instancias de Pandas, use DataFrame.groupby().cogroup().applyInPandas()
para que dos objetos DataFrame
de PySpark se agrupen mediante una clave común y, después, aplique una función de Python a cada grupo, como se muestra aquí:
- Aplique un orden aleatorio a los datos de forma que los grupos de cada objeto DataFrame que comparten una clave se agrupen de manera conjunta.
- Aplique una función a cada grupo. La entrada de la función son dos elementos
pandas.DataFrame
(con una tupla opcional que representa la clave). La salida de la función es un elementopandas.DataFrame
. - Combine los objetos
pandas.DataFrame
de todos los grupos en un nuevo objetoDataFrame
de PySpark.
Para usar groupBy().cogroup().applyInPandas()
, tendrá que definir lo siguiente:
- Una función de Python que defina el cálculo para cada grupo.
- Un objeto
StructType
o una cadena que defina el esquema del objetoDataFrame
de PySpark de salida.
Las etiquetas de columna del elemento pandas.DataFrame
devuelto deben coincidir con los nombres de campo del esquema de salida definido si se especifican como cadenas, o bien con los tipos de datos de campo por posición si no son cadenas, por ejemplo, índices enteros. Vea pandas.DataFrame para saber cómo etiquetar columnas al construir un objeto pandas.DataFrame
.
Todos los datos de un grupo conjunto se cargan en memoria antes de aplicar la función. Esto puede provocar excepciones de memoria insuficiente, especialmente si los tamaños de grupo están sesgados. La configuración de maxRecordsPerBatch no se aplica y tendrá que asegurarse de que los datos agrupados se ajusten a la memoria disponible.
En el ejemplo siguiente se muestra cómo usar groupby().cogroup().applyInPandas()
para realizar una operación asof join
entre dos conjuntos de datos.
import pandas as pd
df1 = spark.createDataFrame(
[(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
("time", "id", "v1"))
df2 = spark.createDataFrame(
[(20000101, 1, "x"), (20000101, 2, "y")],
("time", "id", "v2"))
def asof_join(l, r):
return pd.merge_asof(l, r, on="time", by="id")
df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
asof_join, schema="time int, id int, v1 double, v2 string").show()
# +--------+---+---+---+
# | time| id| v1| v2|
# +--------+---+---+---+
# |20000101| 1|1.0| x|
# |20000102| 1|3.0| x|
# |20000101| 2|2.0| y|
# |20000102| 2|4.0| y|
# +--------+---+---+---+
Para obtener el uso detallado, vea pyspark.sql.PandasCogroupedOps.applyInPandas.