Dela via


användardefinierade funktioner – pandas

En användardefinierad Pandas-funktion (UDF) – även kallad vektoriserad UDF – är en användardefinierad funktion som använder Apache Arrow för att överföra data och pandas för att arbeta med data. Pandas UDF:er tillåter vektoriserade åtgärder som kan öka prestanda upp till 100x jämfört med python-UDF:er rad i taget.

Bakgrundsinformation finns i blogginlägget Nya Pandas UDF:er och Python Type Hints i den kommande versionen av Apache Spark 3.0.

Du definierar en Pandas UDF med nyckelordet pandas_udf som dekoratör och omsluter funktionen med ett Tips av Python-typ. Den här artikeln beskriver de olika typerna av Pandas UDF:er och visar hur du använder Pandas UDF:er med typtips.

Serie till serie UDF

Du använder en UDF från serie till serie för att vektorisera skaläråtgärder. Du kan använda dem med API:er som select och withColumn.

Python-funktionen bör ta en Pandas-serie som indata och returnera en Pandas-serie med samma längd, och du bör ange dessa i Tips av Python-typ. Spark kör en Pandas UDF genom att dela upp kolumner i batchar, anropa funktionen för varje batch som en delmängd av data och sedan sammanfoga resultaten.

I följande exempel visas hur du skapar en Pandas UDF som beräknar produkten av 2 kolumner.

import pandas as pd
from pyspark.sql.functions import col, pandas_udf
from pyspark.sql.types import LongType

# Declare the function and create the UDF
def multiply_func(a: pd.Series, b: pd.Series) -> pd.Series:
    return a * b

multiply = pandas_udf(multiply_func, returnType=LongType())

# The function for a pandas_udf should be able to execute with local pandas data
x = pd.Series([1, 2, 3])
print(multiply_func(x, x))
# 0    1
# 1    4
# 2    9
# dtype: int64

# Create a Spark DataFrame, 'spark' is an existing SparkSession
df = spark.createDataFrame(pd.DataFrame(x, columns=["x"]))

# Execute function as a Spark vectorized UDF
df.select(multiply(col("x"), col("x"))).show()
# +-------------------+
# |multiply_func(x, x)|
# +-------------------+
# |                  1|
# |                  4|
# |                  9|
# +-------------------+

Iterator av serie till iterator av serie UDF

En iterator-UDF är samma som en skalär Pandas UDF förutom:

  • Python-funktionen
    • Tar en iterator av batchar i stället för en enda indatabatch som indata.
    • Returnerar en iterator för utdatabatcherna i stället för en enda utdatabatch.
  • Längden på hela utdata i iteratorn ska vara samma som längden på hela indata.
  • Den omslutna Pandas UDF tar en enda Spark-kolumn som indata.

Du bör ange Tips för Python-typ som Iterator[pandas.Series] ->Iterator[pandas.Series].

Den här Pandas UDF är användbar när UDF-körningen kräver initiering av något tillstånd, till exempel att läsa in en maskininlärningsmodellfil för att tillämpa slutsatsdragning för varje indatabatch.

I följande exempel visas hur du skapar en Pandas UDF med iteratorstöd.

import pandas as pd
from typing import Iterator
from pyspark.sql.functions import col, pandas_udf, struct

pdf = pd.DataFrame([1, 2, 3], columns=["x"])
df = spark.createDataFrame(pdf)

# When the UDF is called with the column,
# the input to the underlying function is an iterator of pd.Series.
@pandas_udf("long")
def plus_one(batch_iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
    for x in batch_iter:
        yield x + 1

df.select(plus_one(col("x"))).show()
# +-----------+
# |plus_one(x)|
# +-----------+
# |          2|
# |          3|
# |          4|
# +-----------+

# In the UDF, you can initialize some state before processing batches.
# Wrap your code with try/finally or use context managers to ensure
# the release of resources at the end.
y_bc = spark.sparkContext.broadcast(1)

@pandas_udf("long")
def plus_y(batch_iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
    y = y_bc.value  # initialize states
    try:
        for x in batch_iter:
            yield x + y
    finally:
        pass  # release resources here, if any

df.select(plus_y(col("x"))).show()
# +---------+
# |plus_y(x)|
# +---------+
# |        2|
# |        3|
# |        4|
# +---------+

Iterator för flera serier till iterator i serie UDF

En iterator för flera serier till iterator i serie UDF har liknande egenskaper och begränsningar som Iterator för serie till iterator i serie UDF. Den angivna funktionen tar en iterator av batchar och matar ut en iterator med batchar. Det är också användbart när UDF-körningen kräver att vissa tillstånd initieras.

Skillnaderna är:

  • Den underliggande Python-funktionen tar en iterator av en tupplar av Pandas Series.
  • Den omslutna Pandas UDF tar flera Spark-kolumner som indata.

Du anger typtipsen som Iterator[Tuple[pandas.Series, ...]] ->Iterator[pandas.Series].

from typing import Iterator, Tuple
import pandas as pd

from pyspark.sql.functions import col, pandas_udf, struct

pdf = pd.DataFrame([1, 2, 3], columns=["x"])
df = spark.createDataFrame(pdf)

@pandas_udf("long")
def multiply_two_cols(
        iterator: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[pd.Series]:
    for a, b in iterator:
        yield a * b

df.select(multiply_two_cols("x", "x")).show()
# +-----------------------+
# |multiply_two_cols(x, x)|
# +-----------------------+
# |                      1|
# |                      4|
# |                      9|
# +-----------------------+

Serie till skalär UDF

Serier till skalär pandas UDF:er liknar Spark-aggregeringsfunktioner. En serie till skalär Pandas UDF definierar en aggregering från en eller flera Pandas-serier till ett skalärt värde, där varje Pandas-serie representerar en Spark-kolumn. Du använder en serie för att skala Pandas UDF med API:er som select, withColumn, groupBy.aggoch pyspark.sql.Window.

Du uttrycker typtipset som pandas.Series, ... ->Any. Returtypen ska vara en primitiv datatyp och den returnerade skalären kan vara antingen en Primitiv Python-typ, till exempel int eller float en NumPy-datatyp som numpy.int64 eller numpy.float64. Any bör helst vara en specifik skalär typ.

Den här typen av UDF stöder inte partiell aggregering och alla data för varje grupp läses in i minnet.

I följande exempel visas hur du använder den här typen av UDF för att beräkna medelvärdet med select, groupByoch window åtgärder:

import pandas as pd
from pyspark.sql.functions import pandas_udf
from pyspark.sql import Window

df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    ("id", "v"))

# Declare the function and create the UDF
@pandas_udf("double")
def mean_udf(v: pd.Series) -> float:
    return v.mean()

df.select(mean_udf(df['v'])).show()
# +-----------+
# |mean_udf(v)|
# +-----------+
# |        4.2|
# +-----------+

df.groupby("id").agg(mean_udf(df['v'])).show()
# +---+-----------+
# | id|mean_udf(v)|
# +---+-----------+
# |  1|        1.5|
# |  2|        6.0|
# +---+-----------+

w = Window \
    .partitionBy('id') \
    .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
df.withColumn('mean_v', mean_udf(df['v']).over(w)).show()
# +---+----+------+
# | id|   v|mean_v|
# +---+----+------+
# |  1| 1.0|   1.5|
# |  1| 2.0|   1.5|
# |  2| 3.0|   6.0|
# |  2| 5.0|   6.0|
# |  2|10.0|   6.0|
# +---+----+------+

Detaljerad användning finns i pyspark.sql.functions.pandas_udf.

Förbrukning

Ange batchstorlek för pil

Kommentar

Den här konfigurationen påverkar inte beräkning som konfigurerats med läget för delad åtkomst och Databricks Runtime 13.3 LTS till och med 14.2.

Datapartitioner i Spark konverteras till Pilpostbatch, vilket tillfälligt kan leda till hög minnesanvändning i JVM. För att undvika möjliga undantag från minnet kan du justera storleken på pilpostbatcherna genom att ställa in konfigurationen spark.sql.execution.arrow.maxRecordsPerBatch på ett heltal som avgör det maximala antalet rader för varje batch. Standardvärdet är 10 000 poster per batch. Om antalet kolumner är stort bör värdet justeras i enlighet med detta. Med den här gränsen delas varje datapartition in i 1 eller fler postbatch för bearbetning.

Tidsstämpel med tidszonssemantik

Spark lagrar internt tidsstämplar som UTC-värden, och tidsstämpeldata som hämtas utan en angiven tidszon konverteras som lokal tid till UTC med mikrosekundersupplösning.

När tidsstämpeldata exporteras eller visas i Spark används sessionstidszonen för att lokalisera tidsstämpelvärdena. Tidszonen för sessionen anges med konfigurationen spark.sql.session.timeZone och är som standard den lokala tidszonen för JVM-systemet. Pandas använder en datetime64 typ med nanosekundersupplösning, , datetime64[ns]med valfri tidszon per kolumn.

När tidsstämpeldata överförs från Spark till Pandas konverteras de till nanosekunder och varje kolumn konverteras till tidszonen för Spark-sessionen och lokaliseras sedan till tidszonen, vilket tar bort tidszonen och visar värden som lokal tid. Detta inträffar när du anropar toPandas() eller pandas_udf med tidsstämpelkolumner.

När tidsstämpeldata överförs från Pandas till Spark konverteras de till UTC-mikrosekunder. Detta inträffar när du anropar createDataFrame med en Pandas DataFrame eller när du returnerar en tidsstämpel från en Pandas UDF. Dessa konverteringar görs automatiskt för att säkerställa att Spark har data i förväntat format, så det är inte nödvändigt att göra någon av dessa konverteringar själv. Alla nanosekunders värden trunkeras.

En standard-UDF läser in tidsstämpeldata som Python datetime-objekt, vilket skiljer sig från en Pandas-tidsstämpel. För att få bästa prestanda rekommenderar vi att du använder pandas tidsseriefunktioner när du arbetar med tidsstämplar i en Pandas UDF. Mer information finns i Time Series/Date-funktioner.

Exempelnotebook-fil

Följande notebook-fil illustrerar de prestandaförbättringar som du kan uppnå med Pandas UDF:er:

Pandas UDF:er benchmark notebook

Hämta notebook-fil