API de fonctions pandas
Les API de fonction Pandas permettent d’appliquer directement une fonction native Python, qui prend et génère des instances Pandas, sur un DataFrame PySpark. Tout comme les fonctions Pandas définies par l’utilisateur, les API de fonction utilisent Apache Arrow pour transférer des données et Pandas pour travailler avec les données. Toutefois, les indicateurs de type Python sont facultatifs dans les API de fonction Pandas.
Il existe trois types d’API de fonction Pandas :
- Mappage groupé
- Mappage
- Mappage cogroupé
Les API de fonction Pandas tirent parti de même la logique interne que l’exécution de fonctions Pandas définies par l’utilisateur. Ils partagent des caractéristiques telles que PyArrow, les types SQL supportés, et les configurations.
Pour plus d’informations, consultez le billet de blog Nouvelles fonctions Pandas définies par l’utilisateur et indicateurs de type Python dans la prochaine version d’Apache Spark 3.0.
Mappage groupé
Vous transformez vos données groupées en utilisant groupBy().applyInPandas()
pour implémenter le modèle « fractionner-appliquer-combiner ». Le processus se déroule en trois étapes :
- Fractionner les données en groupes à l’aide de
DataFrame.groupBy
. - Appliquer une fonction sur chaque groupe. L’entrée et la sortie de la fonction sont toutes les deux des
pandas.DataFrame
. Les données d’entrée contiennent toutes les lignes et toutes les colonnes de chaque groupe. - Combiner les résultats dans un nouveau
DataFrame
.
Pour pouvoir utiliser groupBy().applyInPandas()
, vous devez définir les éléments suivants :
- Fonction Python qui définit le calcul pour chaque groupe
- Objet ou chaîne
StructType
qui définit le schéma duDataFrame
de sortie
Les étiquettes de colonnes du pandas.DataFrame
retourné doivent correspondre au nom des champs dans le schéma de sortie défini si elles sont spécifiées sous forme de chaînes, ou au type de données des champs par position si elles sont d’un autre type, par exemple des indices entiers. Pour savoir comment étiqueter des colonnes lors de la construction d’un pandas.DataFrame
, consultez pandas.DataFrame.
Toutes les données d’un groupe sont chargées en mémoire avant que la fonction ne soit appliquée. Cela peut entraîner des exceptions de mémoire insuffisante, en particulier si les groupes sont de taille asymétrique. La configuration de maxRecordsPerBatch n’est pas appliquée sur les groupes. Il vous appartient de vérifier que les données groupées tiennent dans la mémoire disponible.
L’exemple suivant montre comment utiliser groupby().apply()
pour soustraire la moyenne de chaque valeur du groupe.
df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
("id", "v"))
def subtract_mean(pdf):
# pdf is a pandas.DataFrame
v = pdf.v
return pdf.assign(v=v - v.mean())
df.groupby("id").applyInPandas(subtract_mean, schema="id long, v double").show()
# +---+----+
# | id| v|
# +---+----+
# | 1|-0.5|
# | 1| 0.5|
# | 2|-3.0|
# | 2|-1.0|
# | 2| 4.0|
# +---+----+
Pour plus d’informations sur l’utilisation, consultez pyspark.sql.GroupedData.applyInPandas.
Mappage
Les opérations de mappage sur des instances Pandas sont effectuées par DataFrame.mapInPandas()
afin de transformer un itérateur de pandas.DataFrame
en un autre itérateur de pandas.DataFrame
qui représente le DataFrame PySpark actuel et retourne le résultat sous la forme d’un DataFrame PySpark.
La fonction sous-jacente prend et génère un itérateur de pandas.DataFrame
. Elle peut retourner une sortie d’une longueur arbitraire, contrairement à certaines fonctions définies par l’utilisateur pandas comme la fonction de série à série.
L'exemple suivant montre comment utiliser mapInPandas()
:
df = spark.createDataFrame([(1, 21), (2, 30)], ("id", "age"))
def filter_func(iterator):
for pdf in iterator:
yield pdf[pdf.id == 1]
df.mapInPandas(filter_func, schema=df.schema).show()
# +---+---+
# | id|age|
# +---+---+
# | 1| 21|
# +---+---+
Pour plus d’informations sur l’utilisation, consultez pyspark.sql.DataFrame.mapInPandas.
Mappage cogroupé
Pour les opérations de carte cogroupées avec des instances pandas, utilisezDataFrame.groupby().cogroup().applyInPandas()
pour cogrouper deux PySpark DataFrame
s par une clé commune et appliquez ensuite une fonction Python à chaque cogroupe comme indiqué :
- Brasser les données de sorte que les groupes de chaque DataFrame partageant une clé soient cogroupés.
- Appliquer une fonction à chaque cogroupe. La fonction prend en entrée deux
pandas.DataFrame
(avec un tuple facultatif représentant la clé). La sortie de la fonction estpandas.DataFrame
. - Combiner les
pandas.DataFrame
de tous les groupes dans un nouveauDataFrame
PySpark.
Pour pouvoir utiliser groupBy().cogroup().applyInPandas()
, vous devez définir les éléments suivants :
- Fonction Python qui définit le calcul pour chaque cogroupe
- Objet ou chaîne
StructType
qui définit le schéma duDataFrame
PySpark de sortie
Les étiquettes de colonnes du pandas.DataFrame
retourné doivent correspondre au nom des champs dans le schéma de sortie défini si elles sont spécifiées sous forme de chaînes, ou au type de données des champs par position si elles sont d’un autre type, par exemple des indices entiers. Pour savoir comment étiqueter des colonnes lors de la construction d’un pandas.DataFrame
, consultez pandas.DataFrame.
Toutes les données d’un cogroupe sont chargées en mémoire avant que la fonction ne soit appliquée. Cela peut entraîner des exceptions de mémoire insuffisante, en particulier si les groupes sont de taille asymétrique. La configuration de maxRecordsPerBatch n’est pas appliquée. Il vous appartient de vérifier que les données cogroupées tiennent dans la mémoire disponible.
L’exemple suivant montre comment utiliser groupby().cogroup().applyInPandas()
pour effectuer une opération asof join
entre deux jeux de données.
import pandas as pd
df1 = spark.createDataFrame(
[(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
("time", "id", "v1"))
df2 = spark.createDataFrame(
[(20000101, 1, "x"), (20000101, 2, "y")],
("time", "id", "v2"))
def asof_join(l, r):
return pd.merge_asof(l, r, on="time", by="id")
df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
asof_join, schema="time int, id int, v1 double, v2 string").show()
# +--------+---+---+---+
# | time| id| v1| v2|
# +--------+---+---+---+
# |20000101| 1|1.0| x|
# |20000102| 1|3.0| x|
# |20000101| 2|2.0| y|
# |20000102| 2|4.0| y|
# +--------+---+---+---+
Pour plus d’informations sur l’utilisation, consultez pyspark.sql.PandasCogroupedOps.applyInPandas.