Pandas-Funktions-APIs
Mit Pandas-Funktions-APIs können Sie eine native Python-Funktion, die Pandas-Instanzen verwendet und ausgibt, direkt auf einen PySpark-DataFrame anwenden. Ähnlich wie benutzerdefinierte Pandas-Funktionen verwenden auch die Funktions-APIs Apache Arrow, um Daten und Pandas für die Arbeit mit den Daten zu übertragen. Python-Typhinweise sind jedoch in Pandas-Funktions-APIs optional.
Es gibt drei Arten von Pandas-Funktions-APIs:
- Grouped Map
- Map
- Cogrouped Map
Pandas-Funktions-APIs nutzen dieselbe interne Logik, die auch bei der Ausführung von Pandas-UDFs verwendet wird. Sie verwenden die gleichen Eigenschaften, z. B. PyArrow, unterstützte SQL-Typen und die Konfigurationen.
Weitere Informationen finden Sie im Blogbeitrag Neue Pandas-UDFs und Python-Typhinweise in der nächsten Version von Apache Spark 3.0.
Grouped Map
Sie transformieren Ihre gruppierten Daten über groupBy().applyInPandas()
, um das Muster „split-apply-combine“ (Aufteilen-Anwenden-Kombinieren) zu implementieren. „Split-apply-combine“ besteht aus drei Schritten:
- Aufteilen der Daten in Gruppen mithilfe von
DataFrame.groupBy
. - Anwenden einer Funktion auf jede Gruppe. Die Eingabe und die Ausgabe der Funktion sind jeweils
pandas.DataFrame
. Die Eingabedaten enthalten alle Zeilen und Spalten für jede Gruppe. - Kombinieren der Ergebnisse in einem neuen
DataFrame
.
Zur Verwendung von groupBy().applyInPandas()
müssen Sie Folgendes definieren:
- Eine Python-Funktion, die die Berechnung für jede Gruppe definiert
- Ein
StructType
-Objekt oder eine Zeichenfolge, die das Schema des ausgegebenenDataFrame
definiert.
Die Spaltenbezeichnungen des zurückgegebenen pandas.DataFrame
müssen entweder mit den Feldnamen im definierten Ausgabeschema übereinstimmen, wenn sie als Zeichenfolgen angegeben sind, oder mit den Felddatentypen nach Position übereinstimmen, wenn es sich nicht um Zeichenfolgen handelt, z. B. ganzzahlige Indizes. Informationen zum Bezeichnen von Spalten beim Erstellen eines pandas.DataFrame
finden Sie unter pandas.DataFrame.
Alle Daten für eine Gruppe werden in den Arbeitsspeicher geladen, bevor die Funktion angewendet wird. Dies kann zu Speicherausnahmefehlern führen, insbesondere bei ungleichmäßigen Gruppengrößen. Die Konfiguration für maxRecordsPerBatch wird nicht auf Gruppen angewendet, und Sie müssen sicherstellen, dass die gruppierten Daten in den verfügbaren Arbeitsspeicher passen.
In dem folgenden Beispiel wird die Verwendung von groupby().apply()
zum Subtrahieren des Mittelwerts von jedem Wert in der Gruppe veranschaulicht.
df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
("id", "v"))
def subtract_mean(pdf):
# pdf is a pandas.DataFrame
v = pdf.v
return pdf.assign(v=v - v.mean())
df.groupby("id").applyInPandas(subtract_mean, schema="id long, v double").show()
# +---+----+
# | id| v|
# +---+----+
# | 1|-0.5|
# | 1| 0.5|
# | 2|-3.0|
# | 2|-1.0|
# | 2| 4.0|
# +---+----+
Ausführliche Informationen zur Verwendung finden Sie unter pyspark.sql.GroupedData.applyInPandas.
Map
Sie führen mit DataFrame.mapInPandas()
Zuordnungsvorgänge mit Pandas-Instanzen durch, um einen Iterator von pandas.DataFrame
in einen anderen Iterator von pandas.DataFrame
zu transformieren, der den aktuellen PySpark-Datenrahmen darstellt und das Ergebnis als PySpark-Datenrahmen zurückgibt.
Die zugrunde liegende Funktion verwendet einen Iterator von pandas.DataFrame
und gibt ihn aus. Im Gegensatz zu einigen Pandas-UDFs wie „Serie zu Serie“ können sie die Ausgabe in beliebiger Länge zurückgeben.
Das folgende Beispiel zeigt die Verwendung von mapInPandas()
:
df = spark.createDataFrame([(1, 21), (2, 30)], ("id", "age"))
def filter_func(iterator):
for pdf in iterator:
yield pdf[pdf.id == 1]
df.mapInPandas(filter_func, schema=df.schema).show()
# +---+---+
# | id|age|
# +---+---+
# | 1| 21|
# +---+---+
Ausführliche Informationen zur Verwendung finden Sie unter pyspark.sql.DataFrame.mapInPandas.
Cogrouped Map
Für kogruppierte Zuordnungsvorgänge mit Pandas-Instanzen verwenden Sie DataFrame.groupby().cogroup().applyInPandas()
, um zwei PySpark-DataFrame
s nach einem gemeinsamen Schlüssel zu kogruppieren und dann eine Python-Funktion auf jede Kogruppe anzuwenden (siehe unten):
- Mischen (Shuffle) der Daten, sodass die Gruppen der einzelnen Datenrahmen (DataFrames), die einen gemeinsamen Schlüssel verwenden, zusammen gruppiert werden.
- Anwenden einer Funktion auf jede Kogruppe. Die Eingabe der Funktion sind zwei
pandas.DataFrame
s (mit einem optionalen Tupel, das den Schlüssel darstellt). Die Ausgabe der Funktion ist einpandas.DataFrame
. - Kombinieren der
pandas.DataFrame
s aus allen Gruppen in einem neuen PySpark-DataFrame
.
Zur Verwendung von groupBy().cogroup().applyInPandas()
müssen Sie Folgendes definieren:
- Eine Python-Funktion, die die Berechnung für jede Kogruppe definiert.
- Ein
StructType
-Objekt oder eine Zeichenfolge, die das Schema des ausgegebenen PySpark-DataFrame
definiert.
Die Spaltenbezeichnungen des zurückgegebenen pandas.DataFrame
müssen entweder mit den Feldnamen im definierten Ausgabeschema übereinstimmen, wenn sie als Zeichenfolgen angegeben sind, oder mit den Felddatentypen nach Position übereinstimmen, wenn es sich nicht um Zeichenfolgen handelt, z. B. ganzzahlige Indizes. Informationen zum Bezeichnen von Spalten beim Erstellen eines pandas.DataFrame
finden Sie unter pandas.DataFrame.
Alle Daten für eine Kogruppe werden in den Arbeitsspeicher geladen, bevor die Funktion angewendet wird. Dies kann zu Speicherausnahmefehlern führen, insbesondere bei ungleichmäßigen Gruppengrößen. Die Konfiguration für maxRecordsPerBatch wird nicht angewendet, und Sie müssen sicherstellen, dass die kogruppierten Daten in den verfügbaren Arbeitsspeicher passen.
In dem folgenden Beispiel wird die Verwendung von groupby().cogroup().applyInPandas()
zum Durchführen eines asof join
zwischen zwei Datasets veranschaulicht.
import pandas as pd
df1 = spark.createDataFrame(
[(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
("time", "id", "v1"))
df2 = spark.createDataFrame(
[(20000101, 1, "x"), (20000101, 2, "y")],
("time", "id", "v2"))
def asof_join(l, r):
return pd.merge_asof(l, r, on="time", by="id")
df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
asof_join, schema="time int, id int, v1 double, v2 string").show()
# +--------+---+---+---+
# | time| id| v1| v2|
# +--------+---+---+---+
# |20000101| 1|1.0| x|
# |20000102| 1|3.0| x|
# |20000101| 2|2.0| y|
# |20000102| 2|4.0| y|
# +--------+---+---+---+
Ausführliche Informationen zur Verwendung finden Sie unter pyspark.sql.PandasCogroupedOps.applyInPandas.