rozhraní API funkcí pandas
Rozhraní API funkcí pandas umožňují přímo použít nativní funkci Pythonu, která přebírá a vypíše instance pandas do datového rámce PySpark. Podobně jako uživatelem definované funkce pandas, rozhraní API funkcí také používají Apache Arrow k přenosu dat a pandas pro práci s daty; Nápovědy k typům Pythonu jsou však v rozhraních API funkcí pandas volitelné.
Existují tři typy rozhraní API funkcí pandas:
- Seskupené mapy
- Mapu
- Spoluskupovaná mapa
Rozhraní API funkcí pandas využívají stejnou interní logiku, kterou používá spouštění uživatelem uživatele pandas. Sdílejí charakteristiky, jako jsou PyArrow, podporované typy SQL a konfigurace.
Další informace najdete v blogovém příspěvku New Pandas UDF and Python Type Hints v nadcházející verzi Apache Sparku 3.0.
Seskupené mapy
Seskupená data transformujete pomocí groupBy().applyInPandas()
vzoru "split-apply-combine". Split-apply-combine se skládá ze tří kroků:
- Rozdělte data do skupin pomocí
DataFrame.groupBy
. - Použijte funkci pro každou skupinu. Vstup i výstup funkce jsou
pandas.DataFrame
. Vstupní data obsahují všechny řádky a sloupce pro každou skupinu. - Zkombinujte výsledky do nového
DataFrame
.
Pokud chcete použít groupBy().applyInPandas()
, musíte definovat následující:
- Funkce Pythonu, která definuje výpočet pro každou skupinu
- Objekt
StructType
nebo řetězec, který definuje schéma výstupuDataFrame
Popisky vrácených pandas.DataFrame
sloupců musí buď odpovídat názvům polí v definovaném výstupním schématu, pokud jsou zadané jako řetězce, nebo musí odpovídat datovým typům polí podle pozice, pokud ne řetězce, například celočíselné indexy. Viz pandas. Datový rámec pro popisky sloupců při vytváření pandas.DataFrame
.
Všechna data pro skupinu se načtou do paměti před tím, než se funkce použije. To může vést k výjimkám kvůli nedostatku paměti, zejména pokud jsou velikosti skupin nerovnoměrné. Konfigurace pro maxRecordsPerBatch se u skupin nepoužívá a je na vás, abyste zajistili, že se seskupovaná data vejdou do dostupné paměti.
Následující příklad ukazuje, jak použít groupby().apply()
k odečtení střední hodnoty od každé hodnoty ve skupině.
df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
("id", "v"))
def subtract_mean(pdf):
# pdf is a pandas.DataFrame
v = pdf.v
return pdf.assign(v=v - v.mean())
df.groupby("id").applyInPandas(subtract_mean, schema="id long, v double").show()
# +---+----+
# | id| v|
# +---+----+
# | 1|-0.5|
# | 1| 0.5|
# | 2|-3.0|
# | 2|-1.0|
# | 2| 4.0|
# +---+----+
Podrobné informace o využití najdete v tématu pyspark.sql.GroupedData.applyInPandas.
Mapu
Operace mapování s instancemi pandas provádíte podle DataFrame.mapInPandas()
, aby bylo možné transformovat iterátor pandas.DataFrame
na jiný iterátor pandas.DataFrame
, který představuje aktuální datový rámec PySpark a vrátí výsledek jako datový rámec PySpark.
Základní funkce přebírá a vypíše iterátor .pandas.DataFrame
Může vrátit výstup libovolné délky v kontrastu s některými funkcemi UDF pandas, jako jsou řady na řady.
Následující příklad ukazuje, jak používat mapInPandas()
:
df = spark.createDataFrame([(1, 21), (2, 30)], ("id", "age"))
def filter_func(iterator):
for pdf in iterator:
yield pdf[pdf.id == 1]
df.mapInPandas(filter_func, schema=df.schema).show()
# +---+---+
# | id|age|
# +---+---+
# | 1| 21|
# +---+---+
Podrobné informace o využití najdete v tématu pyspark.sql.DataFrame.mapInPandas.
Spoluskupovaná mapa
Pro operace mapování ve skupině s instancemi pandas použijte DataFrame.groupby().cogroup().applyInPandas()
ke spoluskupování dvou PySparků DataFrame
pomocí společného klíče a pak použijte funkci Pythonu pro každou spoluskupinu, jak je znázorněno na obrázku:
- Promíchejte data tak, aby byly skupiny každého datového rámce, které sdílejí klíč, společně seskupené.
- Použijte funkci pro každou spoluskupinu. Vstup funkce je dva
pandas.DataFrame
(s volitelnou kolekcí členů představující klíč). Výstupem funkce jepandas.DataFrame
. - Zkombinujte
pandas.DataFrame
s ze všech skupin do nového PySparkuDataFrame
.
Pokud chcete použít groupBy().cogroup().applyInPandas()
, musíte definovat následující:
- Funkce Pythonu, která definuje výpočet pro každou skupinu.
- Objekt
StructType
nebo řetězec, který definuje schéma výstupu PySparkDataFrame
.
Popisky vrácených pandas.DataFrame
sloupců musí buď odpovídat názvům polí v definovaném výstupním schématu, pokud jsou zadané jako řetězce, nebo musí odpovídat datovým typům polí podle pozice, pokud ne řetězce, například celočíselné indexy. Viz pandas. Datový rámec pro popisky sloupců při vytváření pandas.DataFrame
.
Všechna data pro spoluskupinu se načtou do paměti před tím, než se funkce použije. To může vést k výjimkám kvůli nedostatku paměti, zejména pokud jsou velikosti skupin nerovnoměrné. Konfigurace pro maxRecordsPerBatch se nepoužije a je na vás, abyste zajistili, že se spoluskupovaná data vejdou do dostupné paměti.
Následující příklad ukazuje, jak použít groupby().cogroup().applyInPandas()
k provedení mezi asof join
dvěma datovými sadami.
import pandas as pd
df1 = spark.createDataFrame(
[(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
("time", "id", "v1"))
df2 = spark.createDataFrame(
[(20000101, 1, "x"), (20000101, 2, "y")],
("time", "id", "v2"))
def asof_join(l, r):
return pd.merge_asof(l, r, on="time", by="id")
df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
asof_join, schema="time int, id int, v1 double, v2 string").show()
# +--------+---+---+---+
# | time| id| v1| v2|
# +--------+---+---+---+
# |20000101| 1|1.0| x|
# |20000102| 1|3.0| x|
# |20000101| 2|2.0| y|
# |20000102| 2|4.0| y|
# +--------+---+---+---+
Podrobné informace o využití najdete v tématu pyspark.sql.PandasCogroupedOps.applyInPandas.