funções de API do pandas
As APIs de função pandas permitem que você aplique diretamente uma função nativa do Python que usa e gera instâncias de pandas para um DataFrame PySpark. Semelhante a pandas funções definidas pelo usuário, as APIs de função também usam de seta Apache para transferir dados e pandas para trabalhar com os dados; no entanto, as dicas de tipo Python são opcionais em APIs de função pandas.
Existem três tipos das APIs de função do pandas:
- Mapa agrupado
- Mapa
- Mapa coagrupado
As APIs da função pandas aproveitam a mesma lógica interna que a execução do pandas UDF usa. Eles compartilham características como PyArrow, tipos SQL suportados e as configurações.
Para obter mais informações, consulte a postagem do blog New Pandas UDFs and Python Type Hints in the Upcoming Release of Apache Spark 3.0.
Mapa agrupado
Você transforma os seus dados agrupados usando groupBy().applyInPandas()
para implementar o modelo "split-apply-combine". O processo de dividir-aplicar-combinar consiste em três etapas:
- Divida os dados em grupos usando
DataFrame.groupBy
. - Aplique uma função em cada grupo. A entrada e a saída da função são ambas
pandas.DataFrame
. Os dados de entrada contêm todas as linhas de dados e incluem o columns para cada grupo. - Combine os resultados numa nova
DataFrame
.
Para usar groupBy().applyInPandas()
, você deve definir o seguinte:
- Uma função Python que define a computação para cada grupo
- Um objeto
StructType
ou uma cadeia de caracteres que define a schema da saídaDataFrame
Os rótulos de column do pandas.DataFrame
retornado devem corresponder aos nomes de campo na saída definida schema, se especificados como strings, ou, por exemplo, corresponder aos tipos de dados de campo por posição, se não forem strings, como índices inteiros. Veja pandas.DataFrame para rotular columns ao construir um pandas.DataFrame
.
Todos os dados de um grupo são carregados na memória antes que a função seja aplicada. Isso pode levar a exceções de falta de memória, especialmente se os tamanhos do grupo forem desiguais. A configuração para maxRecordsPerBatch não é aplicada em grupos e cabe a você garantir que os dados agrupados caibam na memória disponível.
O exemplo a seguir mostra como usar groupby().apply()
para subtrair a média de cada valor no grupo.
df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
("id", "v"))
def subtract_mean(pdf):
# pdf is a pandas.DataFrame
v = pdf.v
return pdf.assign(v=v - v.mean())
df.groupby("id").applyInPandas(subtract_mean, schema="id long, v double").show()
# +---+----+
# | id| v|
# +---+----+
# | 1|-0.5|
# | 1| 0.5|
# | 2|-3.0|
# | 2|-1.0|
# | 2| 4.0|
# +---+----+
Para uso detalhado, consulte pyspark.sql.GroupedData.applyInPandas.
Mapa
Você executa operações de mapa com instâncias pandas por DataFrame.mapInPandas()
para transformar um iterador de pandas.DataFrame
em outro iterador de pandas.DataFrame
, que representa o DataFrame PySpark atual e retorna o resultado como um DataFrame PySpark.
A função subjacente toma e produz um iterador de pandas.DataFrame
. Ele pode retornar saída de comprimento arbitrário em contraste com alguns pandas UDFs, como Series to Series.
O exemplo a seguir mostra como usar mapInPandas()
:
df = spark.createDataFrame([(1, 21), (2, 30)], ("id", "age"))
def filter_func(iterator):
for pdf in iterator:
yield pdf[pdf.id == 1]
df.mapInPandas(filter_func, schema=df.schema).show()
# +---+---+
# | id|age|
# +---+---+
# | 1| 21|
# +---+---+
Para uso detalhado, consulte pyspark.sql.DataFrame.mapInPandas.
Mapa coagrupado
Para operações de mapa coagrupadas com instâncias pandas, use DataFrame.groupby().cogroup().applyInPandas()
para coagrupar dois DataFrame
s PySpark por uma chave comum e, em seguida, aplique uma função Python a cada cogrupo, conforme mostrado:
- Embaralhe os dados de modo que os grupos de cada DataFrame que compartilham uma chave sejam coagrupados.
- Aplique uma função a cada cogrupo. A entrada da função consiste em dois
pandas.DataFrame
(com uma tupla opcional que representa a chave). A saída da função é umpandas.DataFrame
. - Combine os
pandas.DataFrame
s de todos os grupos em um novoDataFrame
PySpark.
Para usar groupBy().cogroup().applyInPandas()
, você deve definir o seguinte:
- Uma função Python que define a computação para cada cogrupo.
- Um objeto
StructType
ou uma cadeia de caracteres que define a schema da saída PySparkDataFrame
.
Os rótulos de column do pandas.DataFrame
retornado devem corresponder aos nomes de campo na schema de saída definida, se especificado como strings, ou corresponder aos tipos de dados de campo por posição, se não strings, por exemplo, índices inteiros. Veja pandas. DataFrame como rotular columns ao construir um pandas.DataFrame
.
Todos os dados de um cogrupo são carregados na memória antes que a função seja aplicada. Isso pode levar a exceções de falta de memória, especialmente se os tamanhos do grupo estiverem desproporcionados. A configuração para maxRecordsPerBatch não é aplicada e cabe a você garantir que os dados coagrupados caibam na memória disponível.
O exemplo a seguir mostra como usar groupby().cogroup().applyInPandas()
para executar uma asof join
entre dois conjuntos de dados.
import pandas as pd
df1 = spark.createDataFrame(
[(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
("time", "id", "v1"))
df2 = spark.createDataFrame(
[(20000101, 1, "x"), (20000101, 2, "y")],
("time", "id", "v2"))
def asof_join(l, r):
return pd.merge_asof(l, r, on="time", by="id")
df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
asof_join, schema="time int, id int, v1 double, v2 string").show()
# +--------+---+---+---+
# | time| id| v1| v2|
# +--------+---+---+---+
# |20000101| 1|1.0| x|
# |20000102| 1|3.0| x|
# |20000101| 2|2.0| y|
# |20000102| 2|4.0| y|
# +--------+---+---+---+
Para uso detalhado, consulte pyspark.sql.PandasCogroupedOps.applyInPandas.