Delen via


Functie-API's van pandas

Met pandas-functie-API's kunt u rechtstreeks een systeemeigen Python-functie toepassen die pandas-exemplaren gebruikt en uitvoert op een PySpark DataFrame. Net als gebruikersgedefinieerde pandas-functies, gebruiken functie-API's ook Apache Arrow om gegevens over te dragen en pandas om met de gegevens te werken; Python type hints zijn echter optioneel in pandas-functie-API's.

Er zijn drie typen pandas-functie-API's:

  • Gegroepeerde kaart
  • Kaart
  • Gegroepeerde kaart

Pandas-functie-API's maken gebruik van dezelfde interne logica die door pandas UDF-uitvoering wordt gebruikt. Ze delen kenmerken zoals PyArrow, ondersteunde SQL-typen en de configuraties.

Zie het blogbericht Nieuwe Pandas UDF's en Python Type Hints in de aanstaande release van Apache Spark 3.0voor meer informatie.

Gegroepeerde kaart

U transformeert uw gegroepeerde gegevens met behulp van groupBy().applyInPandas() om het patroon split-apply-combine te implementeren. Split-apply-combine bestaat uit drie stappen:

  • Splits de gegevens in groepen met behulp van DataFrame.groupBy.
  • Een functie toepassen op elke groep. De invoer en uitvoer van de functie zijn beide pandas.DataFrame. De invoergegevens bevatten alle rijen en kolommen voor elke groep.
  • Combineer de resultaten in een nieuwe DataFrame.

Als u groupBy().applyInPandas()wilt gebruiken, moet u het volgende definiëren:

  • Een Python-functie die de berekening voor elke groep definieert
  • Een StructType-object of een tekenreeks die het schema van de uitvoer DataFrame definieert

De kolomlabels van de geretourneerde pandas.DataFrame moeten overeenkomen met de veldnamen in het gedefinieerde uitvoerschema, indien opgegeven als tekenreeksen, of overeenkomen met de veldgegevenstypen op positie als dat niet tekenreeksen zijn, bijvoorbeeld gehele getallen. Zie pandas.DataFrame voor hoe u kolommen labelt bij het maken van een pandas.DataFrame.

Alle gegevens voor een groep worden in het geheugen geladen voordat de functie wordt toegepast. Dit kan leiden tot uitzonderingen met onvoldoende geheugen, met name als de groepsgrootten scheef zijn. De configuratie voor maxRecordsPerBatch wordt niet toegepast op groepen en het is aan u om ervoor te zorgen dat de gegroepeerde gegevens in het beschikbare geheugen passen.

In het volgende voorbeeld ziet u hoe u groupby().apply() gebruikt om het gemiddelde van elke waarde in de groep af te trekken.

df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    ("id", "v"))

def subtract_mean(pdf):
    # pdf is a pandas.DataFrame
    v = pdf.v
    return pdf.assign(v=v - v.mean())

df.groupby("id").applyInPandas(subtract_mean, schema="id long, v double").show()
# +---+----+
# | id|   v|
# +---+----+
# |  1|-0.5|
# |  1| 0.5|
# |  2|-3.0|
# |  2|-1.0|
# |  2| 4.0|
# +---+----+

Zie pyspark.sql.GroupedData.applyInPandasvoor gedetailleerde gebruik.

Kaart

U voert kaartbewerkingen uit met pandas-exemplaren door DataFrame.mapInPandas() om een iterator van pandas.DataFrame te transformeren naar een andere iterator van pandas.DataFrame die het huidige PySpark DataFrame vertegenwoordigt en het resultaat retourneert als een PySpark DataFrame.

De onderliggende functie neemt een iterator van pandas.DataFrameen voert deze uit. Het kan uitvoer van willekeurige lengte retourneren in tegenstelling tot sommige pandas UDF's, zoals Series to Series.

In het volgende voorbeeld ziet u hoe u mapInPandas()gebruikt:

df = spark.createDataFrame([(1, 21), (2, 30)], ("id", "age"))

def filter_func(iterator):
    for pdf in iterator:
        yield pdf[pdf.id == 1]

df.mapInPandas(filter_func, schema=df.schema).show()
# +---+---+
# | id|age|
# +---+---+
# |  1| 21|
# +---+---+

Zie pyspark.sql.DataFrame.mapInPandasvoor gedetailleerde gebruik.

Gegroepeerde kaart

Voor gegroepeerde kaartbewerkingen met pandas-exemplaren gebruikt u DataFrame.groupby().cogroup().applyInPandas() om twee PySpark-DataFrames te groeperen met een gemeenschappelijke sleutel en past u vervolgens een Python-functie toe op elke cogroep, zoals wordt weergegeven:

  • Verdeel de gegevens zodanig dat de groepen van elk DataFrame die een sleutel delen, samen worden gegroepeerd.
  • Pas een functie toe op elke cogroep. De invoer van de functie is twee pandas.DataFrame (met een optionele tuple die de sleutel vertegenwoordigt). De uitvoer van de functie is een pandas.DataFrame.
  • Combineer de pandas.DataFrame’s van alle groepen tot een nieuwe PySpark-DataFrame.

Als u groupBy().cogroup().applyInPandas()wilt gebruiken, moet u het volgende definiëren:

  • Een Python-functie waarmee de berekening voor elke cogroep wordt gedefinieerd.
  • Een StructType-object of een tekenreeks waarmee het schema van de uitvoer pySpark-DataFramewordt gedefinieerd.

De kolomlabels van de geretourneerde pandas.DataFrame moeten overeenkomen met de veldnamen in het gedefinieerde uitvoerschema, indien opgegeven als tekenreeksen, of overeenkomen met de veldgegevenstypen op positie als dat niet tekenreeksen zijn, bijvoorbeeld gehele getallen. Zie pandas.DataFrame voor hoe u kolommen labelt bij het maken van een pandas.DataFrame.

Alle gegevens voor een cogroep worden in het geheugen geladen voordat de functie wordt toegepast. Dit kan leiden tot uitzonderingen met onvoldoende geheugen, met name als de groepsgrootten scheef zijn. De configuratie voor maxRecordsPerBatch wordt niet toegepast en het is aan u om ervoor te zorgen dat de gegroepeerde gegevens in het beschikbare geheugen passen.

In het volgende voorbeeld ziet u hoe u groupby().cogroup().applyInPandas() gebruikt om een asof join uit te voeren tussen twee gegevenssets.

import pandas as pd

df1 = spark.createDataFrame(
    [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
    ("time", "id", "v1"))

df2 = spark.createDataFrame(
    [(20000101, 1, "x"), (20000101, 2, "y")],
    ("time", "id", "v2"))

def asof_join(l, r):
    return pd.merge_asof(l, r, on="time", by="id")

df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
    asof_join, schema="time int, id int, v1 double, v2 string").show()
# +--------+---+---+---+
# |    time| id| v1| v2|
# +--------+---+---+---+
# |20000101|  1|1.0|  x|
# |20000102|  1|3.0|  x|
# |20000101|  2|2.0|  y|
# |20000102|  2|4.0|  y|
# +--------+---+---+---+

Zie pyspark.sql.PandasCogroupedOps.applyInPandasvoor gedetailleerde gebruik.