Dela via


PANDAS-funktions-API:er

Med pandas-funktions-API:er kan du direkt tillämpa en infödd Python-funktion som tar och matar ut Pandas-instanser till en PySpark DataFrame. På samma sätt som pandas användardefinierade funktioneranvänder funktions-API:er även Apache Arrow- för att överföra data och pandas för att arbeta med data. Python-typtips är dock valfria i PANDAS-funktions-API:er.

Det finns tre typer av PANDAS-funktions-API:er:

  • Grupperad karta
  • Karta
  • Samordnad karta

Pandas-funktions-API:er använder samma interna logik som pandas UDF-körning använder. De delar egenskaper som PyArrow, SQL-typer som stöds och konfigurationerna.

Mer information finns i blogginlägget Nya Pandas UDF:er och Python Type Hints i nästa version av Apache Spark 3.0.

Grupperad karta

Du transformerar grupperade data med hjälp av groupBy().applyInPandas() för att implementera mönstret "split-apply-combine". Split-apply-combine består av tre steg:

  • Dela upp data i grupper med hjälp av DataFrame.groupBy.
  • Tillämpa en funktion på varje grupp. Både indata och utdata för funktionen är pandas.DataFrame. Indata innehåller alla rader och columns för varje grupp.
  • Kombinera resultaten till en ny DataFrame.

Om du vill använda groupBy().applyInPandas()måste du definiera följande:

  • En Python-funktion som definierar beräkningen för varje grupp
  • Ett StructType objekt eller en sträng som definierar schema av utdata DataFrame

De column-etiketterna för den returnerade pandas.DataFrame måste antingen matcha fältnamnen i den definierade utdata schema om de anges som strängar, eller matcha fältdatatyperna efter position om de inte är strängar, till exempel heltalsindex. Se pandas.DataFrame- för hur du etiketterar columns när du skapar en pandas.DataFrame.

Alla data för en grupp läses in i minnet innan funktionen tillämpas. Detta kan leda till undantag från minnet, särskilt om gruppstorlekarna är skeva. Konfigurationen för maxRecordsPerBatch tillämpas inte på grupper och det är upp till dig att se till att grupperade data passar in i det tillgängliga minnet.

I följande exempel visas hur du använder groupby().apply() för att subtrahera medelvärdet från varje värde i gruppen.

df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    ("id", "v"))

def subtract_mean(pdf):
    # pdf is a pandas.DataFrame
    v = pdf.v
    return pdf.assign(v=v - v.mean())

df.groupby("id").applyInPandas(subtract_mean, schema="id long, v double").show()
# +---+----+
# | id|   v|
# +---+----+
# |  1|-0.5|
# |  1| 0.5|
# |  2|-3.0|
# |  2|-1.0|
# |  2| 4.0|
# +---+----+

Detaljerad användning finns i pyspark.sql.GroupedData.applyInPandas.

Karta

Du utför kartåtgärder med Pandas-instanser genom att använda DataFrame.mapInPandas() för att omvandla en iterator av pandas.DataFrame till en annan iterator av pandas.DataFrame, som representerar den aktuella PySpark DataFrame och returnerar resultatet som en PySpark DataFrame.

Den underliggande funktionen tar och matar ut en iterator för pandas.DataFrame. Det kan returnera utdata av godtycklig längd i motsats till vissa Pandas UDF:er, till exempel Serie till serie.

I följande exempel visas hur du använder mapInPandas():

df = spark.createDataFrame([(1, 21), (2, 30)], ("id", "age"))

def filter_func(iterator):
    for pdf in iterator:
        yield pdf[pdf.id == 1]

df.mapInPandas(filter_func, schema=df.schema).show()
# +---+---+
# | id|age|
# +---+---+
# |  1| 21|
# +---+---+

Detaljerad användning finns i pyspark.sql.DataFrame.mapInPandas.

Samordnad karta

För grupperade kartåtgärder med Pandas-instanser använder du DataFrame.groupby().cogroup().applyInPandas() för att gruppera två PySpark-DataFramemed en gemensam nyckel och sedan tillämpa en Python-funktion på varje grupp enligt följande:

  • Blanda data så att grupperna för varje DataFrame som delar en nyckel är grupperade tillsammans.
  • Tillämpa en funktion på varje kugrupp. Indata för funktionen är två pandas.DataFrame (med en valfri tuppeln som representerar nyckeln). Funktionens utdata är en pandas.DataFrame.
  • Kombinera pandas.DataFramefrån alla grupper till en ny PySpark DataFrame.

Om du vill använda groupBy().cogroup().applyInPandas()måste du definiera följande:

  • En Python-funktion som definierar beräkningen för varje samgrupp.
  • Ett StructType-objekt eller en sträng som definierar schema för PySparks utdata DataFrame.

De column etiketterna av den returnerade pandas.DataFrame måste antingen matcha fältnamnen i de definierade utdata schema om de är specificerade som strängar, eller matcha fältdatatyperna efter position om de inte är strängar, exempelvis heltalsindex. Se pandas.DataFrame för hur du märker columns när du skapar en pandas.DataFrame.

Alla data för en grupp läses in i minnet innan funktionen tillämpas. Detta kan leda till undantag från minnet, särskilt om gruppstorlekarna är skeva. Konfigurationen för maxRecordsPerBatch tillämpas inte och det är upp till dig att se till att de grupperade data passar in i det tillgängliga minnet.

I följande exempel visas hur du använder groupby().cogroup().applyInPandas() för att utföra en asof join mellan två datauppsättningar.

import pandas as pd

df1 = spark.createDataFrame(
    [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
    ("time", "id", "v1"))

df2 = spark.createDataFrame(
    [(20000101, 1, "x"), (20000101, 2, "y")],
    ("time", "id", "v2"))

def asof_join(l, r):
    return pd.merge_asof(l, r, on="time", by="id")

df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
    asof_join, schema="time int, id int, v1 double, v2 string").show()
# +--------+---+---+---+
# |    time| id| v1| v2|
# +--------+---+---+---+
# |20000101|  1|1.0|  x|
# |20000102|  1|3.0|  x|
# |20000101|  2|2.0|  y|
# |20000102|  2|4.0|  y|
# +--------+---+---+---+

Detaljerad användning finns i pyspark.sql.PandasCogroupedOps.applyInPandas.