Freigeben über


Pandas-Funktions-APIs

Mit Pandas-Funktions-APIs können Sie eine native Python-Funktion, die Pandas-Instanzen verwendet und ausgibt, direkt auf einen PySpark-DataFrame anwenden. Ähnlich wie benutzerdefinierte Pandas-Funktionen verwenden auch die Funktions-APIs Apache Arrow, um Daten und Pandas für die Arbeit mit den Daten zu übertragen. Python-Typhinweise sind jedoch in Pandas-Funktions-APIs optional.

Es gibt drei Arten von Pandas-Funktions-APIs:

  • Grouped Map
  • Map
  • Cogrouped Map

Pandas-Funktions-APIs nutzen dieselbe interne Logik, die auch bei der Ausführung von Pandas-UDFs verwendet wird. Sie verwenden die gleichen Eigenschaften, z. B. PyArrow, unterstützte SQL-Typen und die Konfigurationen.

Weitere Informationen finden Sie im Blogbeitrag Neue Pandas-UDFs und Python-Typhinweise in der nächsten Version von Apache Spark 3.0.

Grouped Map

Sie transformieren Ihre gruppierten Daten über groupBy().applyInPandas(), um das Muster „split-apply-combine“ (Aufteilen-Anwenden-Kombinieren) zu implementieren. „Split-apply-combine“ besteht aus drei Schritten:

  • Aufteilen der Daten in Gruppen mithilfe von DataFrame.groupBy.
  • Anwenden einer Funktion auf jede Gruppe. Die Eingabe und die Ausgabe der Funktion sind jeweils pandas.DataFrame. Die Eingabedaten enthalten alle Zeilen und Spalten für jede Gruppe.
  • Kombinieren der Ergebnisse in einem neuen DataFrame.

Zur Verwendung von groupBy().applyInPandas() müssen Sie Folgendes definieren:

  • Eine Python-Funktion, die die Berechnung für jede Gruppe definiert
  • Ein StructType-Objekt oder eine Zeichenfolge, die das Schema des ausgegebenen DataFrame definiert.

Die Spaltenbezeichnungen des zurückgegebenen pandas.DataFrame müssen entweder mit den Feldnamen im definierten Ausgabeschema übereinstimmen, wenn sie als Zeichenfolgen angegeben sind, oder mit den Felddatentypen nach Position übereinstimmen, wenn es sich nicht um Zeichenfolgen handelt, z. B. ganzzahlige Indizes. Informationen zum Bezeichnen von Spalten beim Erstellen eines pandas.DataFrame finden Sie unter pandas.DataFrame.

Alle Daten für eine Gruppe werden in den Arbeitsspeicher geladen, bevor die Funktion angewendet wird. Dies kann zu Speicherausnahmefehlern führen, insbesondere bei ungleichmäßigen Gruppengrößen. Die Konfiguration für maxRecordsPerBatch wird nicht auf Gruppen angewendet, und Sie müssen sicherstellen, dass die gruppierten Daten in den verfügbaren Arbeitsspeicher passen.

In dem folgenden Beispiel wird die Verwendung von groupby().apply() zum Subtrahieren des Mittelwerts von jedem Wert in der Gruppe veranschaulicht.

df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    ("id", "v"))

def subtract_mean(pdf):
    # pdf is a pandas.DataFrame
    v = pdf.v
    return pdf.assign(v=v - v.mean())

df.groupby("id").applyInPandas(subtract_mean, schema="id long, v double").show()
# +---+----+
# | id|   v|
# +---+----+
# |  1|-0.5|
# |  1| 0.5|
# |  2|-3.0|
# |  2|-1.0|
# |  2| 4.0|
# +---+----+

Ausführliche Informationen zur Verwendung finden Sie unter pyspark.sql.GroupedData.applyInPandas.

Map

Sie führen mit DataFrame.mapInPandas() Zuordnungsvorgänge mit Pandas-Instanzen durch, um einen Iterator von pandas.DataFrame in einen anderen Iterator von pandas.DataFrame zu transformieren, der den aktuellen PySpark-Datenrahmen darstellt und das Ergebnis als PySpark-Datenrahmen zurückgibt.

Die zugrunde liegende Funktion verwendet einen Iterator von pandas.DataFrame und gibt ihn aus. Im Gegensatz zu einigen Pandas-UDFs wie „Serie zu Serie“ können sie die Ausgabe in beliebiger Länge zurückgeben.

Das folgende Beispiel zeigt die Verwendung von mapInPandas():

df = spark.createDataFrame([(1, 21), (2, 30)], ("id", "age"))

def filter_func(iterator):
    for pdf in iterator:
        yield pdf[pdf.id == 1]

df.mapInPandas(filter_func, schema=df.schema).show()
# +---+---+
# | id|age|
# +---+---+
# |  1| 21|
# +---+---+

Ausführliche Informationen zur Verwendung finden Sie unter pyspark.sql.DataFrame.mapInPandas.

Cogrouped Map

Für kogruppierte Zuordnungsvorgänge mit Pandas-Instanzen verwenden Sie DataFrame.groupby().cogroup().applyInPandas(), um zwei PySpark-DataFrames nach einem gemeinsamen Schlüssel zu kogruppieren und dann eine Python-Funktion auf jede Kogruppe anzuwenden (siehe unten):

  • Mischen (Shuffle) der Daten, sodass die Gruppen der einzelnen Datenrahmen (DataFrames), die einen gemeinsamen Schlüssel verwenden, zusammen gruppiert werden.
  • Anwenden einer Funktion auf jede Kogruppe. Die Eingabe der Funktion sind zwei pandas.DataFrames (mit einem optionalen Tupel, das den Schlüssel darstellt). Die Ausgabe der Funktion ist ein pandas.DataFrame.
  • Kombinieren der pandas.DataFrames aus allen Gruppen in einem neuen PySpark-DataFrame.

Zur Verwendung von groupBy().cogroup().applyInPandas() müssen Sie Folgendes definieren:

  • Eine Python-Funktion, die die Berechnung für jede Kogruppe definiert.
  • Ein StructType-Objekt oder eine Zeichenfolge, die das Schema des ausgegebenen PySpark-DataFrame definiert.

Die Spaltenbezeichnungen des zurückgegebenen pandas.DataFrame müssen entweder mit den Feldnamen im definierten Ausgabeschema übereinstimmen, wenn sie als Zeichenfolgen angegeben sind, oder mit den Felddatentypen nach Position übereinstimmen, wenn es sich nicht um Zeichenfolgen handelt, z. B. ganzzahlige Indizes. Informationen zum Bezeichnen von Spalten beim Erstellen eines pandas.DataFrame finden Sie unter pandas.DataFrame.

Alle Daten für eine Kogruppe werden in den Arbeitsspeicher geladen, bevor die Funktion angewendet wird. Dies kann zu Speicherausnahmefehlern führen, insbesondere bei ungleichmäßigen Gruppengrößen. Die Konfiguration für maxRecordsPerBatch wird nicht angewendet, und Sie müssen sicherstellen, dass die kogruppierten Daten in den verfügbaren Arbeitsspeicher passen.

In dem folgenden Beispiel wird die Verwendung von groupby().cogroup().applyInPandas() zum Durchführen eines asof join zwischen zwei Datasets veranschaulicht.

import pandas as pd

df1 = spark.createDataFrame(
    [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
    ("time", "id", "v1"))

df2 = spark.createDataFrame(
    [(20000101, 1, "x"), (20000101, 2, "y")],
    ("time", "id", "v2"))

def asof_join(l, r):
    return pd.merge_asof(l, r, on="time", by="id")

df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
    asof_join, schema="time int, id int, v1 double, v2 string").show()
# +--------+---+---+---+
# |    time| id| v1| v2|
# +--------+---+---+---+
# |20000101|  1|1.0|  x|
# |20000102|  1|3.0|  x|
# |20000101|  2|2.0|  y|
# |20000102|  2|4.0|  y|
# +--------+---+---+---+

Ausführliche Informationen zur Verwendung finden Sie unter pyspark.sql.PandasCogroupedOps.applyInPandas.