Sdílet prostřednictvím


rozhraní API funkcí pandas

Rozhraní API funkcí pandas umožňují přímo použít nativní funkci Pythonu, která přebírá a vypíše instance pandas do datového rámce PySpark. Podobně jako uživatelem definované funkce pandas, rozhraní API funkcí také používají Apache Arrow k přenosu dat a pandas pro práci s daty; Nápovědy k typům Pythonu jsou však v rozhraních API funkcí pandas volitelné.

Existují tři typy rozhraní API funkcí pandas:

  • Seskupené mapy
  • Mapu
  • Spoluskupovaná mapa

Rozhraní API funkcí pandas využívají stejnou interní logiku, kterou používá spouštění uživatelem uživatele pandas. Sdílejí charakteristiky, jako jsou PyArrow, podporované typy SQL a konfigurace.

Další informace najdete v blogovém příspěvku New Pandas UDF and Python Type Hints v nadcházející verzi Apache Sparku 3.0.

Seskupené mapy

Seskupená data transformujete pomocí groupBy().applyInPandas() vzoru "split-apply-combine". Split-apply-combine se skládá ze tří kroků:

  • Rozdělte data do skupin pomocí DataFrame.groupBy.
  • Použijte funkci pro každou skupinu. Vstup i výstup funkce jsou pandas.DataFrame. Vstupní data obsahují všechny řádky a sloupce pro každou skupinu.
  • Zkombinujte výsledky do nového DataFrame.

Pokud chcete použít groupBy().applyInPandas(), musíte definovat následující:

  • Funkce Pythonu, která definuje výpočet pro každou skupinu
  • Objekt StructType nebo řetězec, který definuje schéma výstupu DataFrame

Popisky vrácených pandas.DataFrame sloupců musí buď odpovídat názvům polí v definovaném výstupním schématu, pokud jsou zadané jako řetězce, nebo musí odpovídat datovým typům polí podle pozice, pokud ne řetězce, například celočíselné indexy. Viz pandas. Datový rámec pro popisky sloupců při vytváření pandas.DataFrame.

Všechna data pro skupinu se načtou do paměti před tím, než se funkce použije. To může vést k výjimkám kvůli nedostatku paměti, zejména pokud jsou velikosti skupin nerovnoměrné. Konfigurace pro maxRecordsPerBatch se u skupin nepoužívá a je na vás, abyste zajistili, že se seskupovaná data vejdou do dostupné paměti.

Následující příklad ukazuje, jak použít groupby().apply() k odečtení střední hodnoty od každé hodnoty ve skupině.

df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    ("id", "v"))

def subtract_mean(pdf):
    # pdf is a pandas.DataFrame
    v = pdf.v
    return pdf.assign(v=v - v.mean())

df.groupby("id").applyInPandas(subtract_mean, schema="id long, v double").show()
# +---+----+
# | id|   v|
# +---+----+
# |  1|-0.5|
# |  1| 0.5|
# |  2|-3.0|
# |  2|-1.0|
# |  2| 4.0|
# +---+----+

Podrobné informace o využití najdete v tématu pyspark.sql.GroupedData.applyInPandas.

Mapu

Operace mapování s instancemi pandas provádíte podle DataFrame.mapInPandas() , aby bylo možné transformovat iterátor pandas.DataFrame na jiný iterátor pandas.DataFrame , který představuje aktuální datový rámec PySpark a vrátí výsledek jako datový rámec PySpark.

Základní funkce přebírá a vypíše iterátor .pandas.DataFrame Může vrátit výstup libovolné délky v kontrastu s některými funkcemi UDF pandas, jako jsou řady na řady.

Následující příklad ukazuje, jak používat mapInPandas():

df = spark.createDataFrame([(1, 21), (2, 30)], ("id", "age"))

def filter_func(iterator):
    for pdf in iterator:
        yield pdf[pdf.id == 1]

df.mapInPandas(filter_func, schema=df.schema).show()
# +---+---+
# | id|age|
# +---+---+
# |  1| 21|
# +---+---+

Podrobné informace o využití najdete v tématu pyspark.sql.DataFrame.mapInPandas.

Spoluskupovaná mapa

Pro operace mapování ve skupině s instancemi pandas použijte DataFrame.groupby().cogroup().applyInPandas() ke spoluskupování dvou PySparků DataFramepomocí společného klíče a pak použijte funkci Pythonu pro každou spoluskupinu, jak je znázorněno na obrázku:

  • Promíchejte data tak, aby byly skupiny každého datového rámce, které sdílejí klíč, společně seskupené.
  • Použijte funkci pro každou spoluskupinu. Vstup funkce je dva pandas.DataFrame (s volitelnou kolekcí členů představující klíč). Výstupem funkce je pandas.DataFrame.
  • Zkombinujte pandas.DataFrames ze všech skupin do nového PySparku DataFrame.

Pokud chcete použít groupBy().cogroup().applyInPandas(), musíte definovat následující:

  • Funkce Pythonu, která definuje výpočet pro každou skupinu.
  • Objekt StructType nebo řetězec, který definuje schéma výstupu PySpark DataFrame.

Popisky vrácených pandas.DataFrame sloupců musí buď odpovídat názvům polí v definovaném výstupním schématu, pokud jsou zadané jako řetězce, nebo musí odpovídat datovým typům polí podle pozice, pokud ne řetězce, například celočíselné indexy. Viz pandas. Datový rámec pro popisky sloupců při vytváření pandas.DataFrame.

Všechna data pro spoluskupinu se načtou do paměti před tím, než se funkce použije. To může vést k výjimkám kvůli nedostatku paměti, zejména pokud jsou velikosti skupin nerovnoměrné. Konfigurace pro maxRecordsPerBatch se nepoužije a je na vás, abyste zajistili, že se spoluskupovaná data vejdou do dostupné paměti.

Následující příklad ukazuje, jak použít groupby().cogroup().applyInPandas() k provedení mezi asof join dvěma datovými sadami.

import pandas as pd

df1 = spark.createDataFrame(
    [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
    ("time", "id", "v1"))

df2 = spark.createDataFrame(
    [(20000101, 1, "x"), (20000101, 2, "y")],
    ("time", "id", "v2"))

def asof_join(l, r):
    return pd.merge_asof(l, r, on="time", by="id")

df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
    asof_join, schema="time int, id int, v1 double, v2 string").show()
# +--------+---+---+---+
# |    time| id| v1| v2|
# +--------+---+---+---+
# |20000101|  1|1.0|  x|
# |20000102|  1|3.0|  x|
# |20000101|  2|2.0|  y|
# |20000102|  2|4.0|  y|
# +--------+---+---+---+

Podrobné informace o využití najdete v tématu pyspark.sql.PandasCogroupedOps.applyInPandas.