Sdílet prostřednictvím


Funkce pandas definované uživatelem

Uživatelem definovaná funkce pandas (UDF) (označovaná také jako vektorizovaná funkce definovaná uživatelem) je uživatelem definovaná funkce, která k přenosu dat a knihovny pandas používá Apache Arrow k práci s daty. Funkce definované uživatelem pandas umožňují vektorizované operace, které můžou zvýšit výkon až o 100x v porovnání s uživatelem definovanými uživatelem Pythonu v řádcích a časem.

Základní informace najdete v blogovém příspěvku New Pandas UDF a Python Type Hints v nadcházející verzi Apache Sparku 3.0.

Pomocí klíčového slova pandas_udf jako dekorátoru definujete UDF knihovny pandas a funkci zabalíte pomocí nápovědy k typu Pythonu. Tento článek popisuje různé typy funkcí definovaných uživatelem v knihovně pandas a ukazuje, jak používat uživatelem definované funkce pandas s nápovědou k typům.

Řada na řadu UDF

K vektorizaci skalárních operací použijete sadu Series to Series to Series pandas UDF. Můžete je použít s rozhraními API, jako select jsou a withColumn.

Funkce Pythonu by měla jako vstup použít řadu pandas a vrátit sadu pandas Series se stejnou délkou a měli byste ji zadat v nápovědě k typu Pythonu. Spark provede pandas UDF rozdělením dat v columns do dávek, vyvoláním funkce pro každou dávku jako pro podmnožinu dat a následným sloučením výsledků.

Následující příklad ukazuje, jak vytvořit pandas UDF, který vypočítá součin 2 columns.

import pandas as pd
from pyspark.sql.functions import col, pandas_udf
from pyspark.sql.types import LongType

# Declare the function and create the UDF
def multiply_func(a: pd.Series, b: pd.Series) -> pd.Series:
    return a * b

multiply = pandas_udf(multiply_func, returnType=LongType())

# The function for a pandas_udf should be able to execute with local pandas data
x = pd.Series([1, 2, 3])
print(multiply_func(x, x))
# 0    1
# 1    4
# 2    9
# dtype: int64

# Create a Spark DataFrame, 'spark' is an existing SparkSession
df = spark.createDataFrame(pd.DataFrame(x, columns=["x"]))

# Execute function as a Spark vectorized UDF
df.select(multiply(col("x"), col("x"))).show()
# +-------------------+
# |multiply_func(x, x)|
# +-------------------+
# |                  1|
# |                  4|
# |                  9|
# +-------------------+

Iterátor řady iterátoru řady UDF

Funkce definovaná uživatelem iterátoru je stejná jako skalární knihovna UDF pandas s výjimkou:

  • Funkce Pythonu
    • Vezme jako vstup iterátor dávek místo jedné vstupní dávky.
    • Vrátí iterátor výstupních dávek místo jedné výstupní dávky.
  • Délka celého výstupu v iterátoru by měla být stejná jako délka celého vstupu.
  • Zabalená pandas UDF jako vstup přebírá jeden Spark column.

Měli byste zadat nápovědu pro typ Pythonu jako Iterator[pandas.Series] ->Iterator[pandas.Series].

Tato funkce UDF knihovny pandas je užitečná, když spuštění UDF vyžaduje inicializaci některého stavu, například načtení souboru modelu strojového učení, aby se použil odvozování pro každou vstupní dávku.

Následující příklad ukazuje, jak vytvořit UDF pandas s podporou iterátoru.

import pandas as pd
from typing import Iterator
from pyspark.sql.functions import col, pandas_udf, struct

pdf = pd.DataFrame([1, 2, 3], columns=["x"])
df = spark.createDataFrame(pdf)

# When the UDF is called with the column,
# the input to the underlying function is an iterator of pd.Series.
@pandas_udf("long")
def plus_one(batch_iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
    for x in batch_iter:
        yield x + 1

df.select(plus_one(col("x"))).show()
# +-----------+
# |plus_one(x)|
# +-----------+
# |          2|
# |          3|
# |          4|
# +-----------+

# In the UDF, you can initialize some state before processing batches.
# Wrap your code with try/finally or use context managers to ensure
# the release of resources at the end.
y_bc = spark.sparkContext.broadcast(1)

@pandas_udf("long")
def plus_y(batch_iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
    y = y_bc.value  # initialize states
    try:
        for x in batch_iter:
            yield x + y
    finally:
        pass  # release resources here, if any

df.select(plus_y(col("x"))).show()
# +---------+
# |plus_y(x)|
# +---------+
# |        2|
# |        3|
# |        4|
# +---------+

Iterátor více řad iterátoru UDF řady

Iterátor více řad iterátoru iterátoru UDF řady má podobné vlastnosti a omezení jako Iterátor řad iterátoru řady UDF. Zadaná funkce přebírá iterátor dávek a vypíše iterátor dávek. Je také užitečné, když spuštění UDF vyžaduje inicializaci nějakého stavu.

Rozdíly jsou:

  • Základní funkce Pythonu přebírá iterátor řazené kolekce členů knihovny pandas Series.
  • Zabalená UDF pandas bere jako vstup několik Spark columns.

Nápovědu typu zadáte jako Iterator[Tuple[pandas.Series, ...]] ->Iterator[pandas.Series].

from typing import Iterator, Tuple
import pandas as pd

from pyspark.sql.functions import col, pandas_udf, struct

pdf = pd.DataFrame([1, 2, 3], columns=["x"])
df = spark.createDataFrame(pdf)

@pandas_udf("long")
def multiply_two_cols(
        iterator: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[pd.Series]:
    for a, b in iterator:
        yield a * b

df.select(multiply_two_cols("x", "x")).show()
# +-----------------------+
# |multiply_two_cols(x, x)|
# +-----------------------+
# |                      1|
# |                      4|
# |                      9|
# +-----------------------+

Řady pro skalární definovanou uživatelem

UDF skalární knihovny pandas se podobají agregačním funkcím Sparku. A Series to scalar pandas UDF definuje agregaci z jedné nebo více pandas Series na skalární hodnotu, where každá řada pandas představuje Spark column. Pomocí řady můžete skalární knihovnu pandas UDF používat s rozhraními API, jako jsou select, withColumn, groupBy.agga pyspark.sql.Window.

Tip typu vyjadřujete jako pandas.Series, ... ->Any. Návratový typ by měl být primitivním datovým typem a vrácený skalár může být primitivním typem Pythonu, například nebo int datovým typem NumPy, float například numpy.int64 nebo numpy.float64. Any v ideálním případě by měl být konkrétním skalárním typem.

Tento typ UDF nepodporuje částečnou agregaci a všechna data pro každou skupinu se načtou do paměti.

Následující příklad ukazuje, jak tento typ UDF použít k výpočtu průměru s select, groupBya window operace:

import pandas as pd
from pyspark.sql.functions import pandas_udf
from pyspark.sql import Window

df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    ("id", "v"))

# Declare the function and create the UDF
@pandas_udf("double")
def mean_udf(v: pd.Series) -> float:
    return v.mean()

df.select(mean_udf(df['v'])).show()
# +-----------+
# |mean_udf(v)|
# +-----------+
# |        4.2|
# +-----------+

df.groupby("id").agg(mean_udf(df['v'])).show()
# +---+-----------+
# | id|mean_udf(v)|
# +---+-----------+
# |  1|        1.5|
# |  2|        6.0|
# +---+-----------+

w = Window \
    .partitionBy('id') \
    .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
df.withColumn('mean_v', mean_udf(df['v']).over(w)).show()
# +---+----+------+
# | id|   v|mean_v|
# +---+----+------+
# |  1| 1.0|   1.5|
# |  1| 2.0|   1.5|
# |  2| 3.0|   6.0|
# |  2| 5.0|   6.0|
# |  2|10.0|   6.0|
# +---+----+------+

Podrobné informace o využití najdete v tématu pyspark.sql.functions.pandas_udf.

Využití

Nastavení velikosti dávky šipky

Poznámka:

Tato konfigurace nemá žádný vliv na výpočetní výkon nakonfigurovaný pomocí režimu sdíleného přístupu a databricks Runtime 13.3 LTS až 14.2.

Datové oddíly ve Sparku se převedou na dávky záznamů se šipkami, což může dočasně vést k vysokému využití paměti v prostředí JVM. Abyste se vyhnuli možným výjimkám z paměti, můžete upravit velikost dávek se šipkami tak, že nastavíte spark.sql.execution.arrow.maxRecordsPerBatch konfiguraci na celé číslo, které určuje maximální počet řádků pro každou dávku. Výchozí hodnota je 10 000 záznamů na dávku. Pokud je počet columns velký, měla by se hodnota odpovídajícím způsobem upravit. Při použití tohoto limitse každá datová partition rozdělí do 1 nebo více dávek záznamů pro zpracování.

Časové razítko sémantikou časového pásma

Spark interně ukládá časová razítka jako UTC valuesa data časového razítka přenesená bez zadaného časového pásma se převedou jako místní čas na UTC s rozlišením mikrosekund.

Při exportu nebo zobrazení dat časového razítka ve Sparku se časové pásmo relace používá k lokalizaci časového razítka values. Časové pásmo relace je set s konfigurací spark.sql.session.timeZone a výchozí hodnotou je místní časové pásmo systému JVM. Knihovna pandas používá typ datetime64 s rozlišením na nanosekundy, datetime64[ns], a volitelné časové pásmo je nastavováno podlecolumn.

Při přenosu dat časového razítka ze Sparku do pandas se tato data převedou na nanosekundy a každý column se nejprve převede podle časového pásma relace Sparku. Poté se lokalizuje do tohoto časového pásma, čímž se časové pásmo odebere a values se zobrazí jako místní čas. K tomu dochází při volání toPandas() nebo pandas_udf s časovým razítkem columns.

Při přenosu dat časového razítka z pandas do Sparku se převedou na mikrosekundy UTC. K tomu dochází při volání createDataFrame s datovým rámcem pandas nebo při vrácení časového razítka z uživatelem definovaného uživatelem pandas. Tyto převody se provádějí automaticky, aby Spark měl data v očekávaném formátu, takže není nutné provádět žádné z těchto převodů sami. Všechny nanosekundové values jsou zkráceny.

Standardní UDF načte data časového razítka jako objekty data a času Pythonu, které se liší od časového razítka pandas. Pokud chcete get nejlepšího výkonu, doporučujeme při práci s časovými razítky v UDF knihovny pandas použít funkci časové řady pandas. Podrobnosti najdete v tématu Funkce časové řady a data.

Příklad poznámkového bloku

Následující poznámkový blok znázorňuje vylepšení výkonu, která můžete dosáhnout pomocí uživatelem definovaných funkcí pandas:

Poznámkový blok srovnávacího testu UDF pandas

Get poznámkový blok