användardefinierade funktioner – pandas
En användardefinierad Pandas-funktion (UDF) – även kallad vektoriserad UDF – är en användardefinierad funktion som använder Apache Arrow för att överföra data och pandas för att arbeta med data. Pandas UDF:er tillåter vektoriserade åtgärder som kan öka prestanda upp till 100x jämfört med python-UDF:er rad i taget.
Bakgrundsinformation finns i blogginlägget Nya Pandas UDF:er och Python Type Hints i den kommande versionen av Apache Spark 3.0.
Du definierar en Pandas UDF med nyckelordet pandas_udf
som dekoratör och omsluter funktionen med ett Tips av Python-typ.
Den här artikeln beskriver de olika typerna av Pandas UDF:er och visar hur du använder Pandas UDF:er med typtips.
Serie till serie UDF
Du använder en UDF från serie till serie för att vektorisera skaläråtgärder.
Du kan använda dem med API:er som select
och withColumn
.
Python-funktionen bör ta en Pandas-serie som indata och returnera en Pandas-serie med samma längd, och du bör ange dessa i Tips av Python-typ. Spark kör en Pandas UDF genom att dela upp kolumner i batchar, anropa funktionen för varje batch som en delmängd av data och sedan sammanfoga resultaten.
I följande exempel visas hur du skapar en Pandas UDF som beräknar produkten av 2 kolumner.
import pandas as pd
from pyspark.sql.functions import col, pandas_udf
from pyspark.sql.types import LongType
# Declare the function and create the UDF
def multiply_func(a: pd.Series, b: pd.Series) -> pd.Series:
return a * b
multiply = pandas_udf(multiply_func, returnType=LongType())
# The function for a pandas_udf should be able to execute with local pandas data
x = pd.Series([1, 2, 3])
print(multiply_func(x, x))
# 0 1
# 1 4
# 2 9
# dtype: int64
# Create a Spark DataFrame, 'spark' is an existing SparkSession
df = spark.createDataFrame(pd.DataFrame(x, columns=["x"]))
# Execute function as a Spark vectorized UDF
df.select(multiply(col("x"), col("x"))).show()
# +-------------------+
# |multiply_func(x, x)|
# +-------------------+
# | 1|
# | 4|
# | 9|
# +-------------------+
Iterator av serie till iterator av serie UDF
En iterator-UDF är samma som en skalär Pandas UDF förutom:
- Python-funktionen
- Tar en iterator av batchar i stället för en enda indatabatch som indata.
- Returnerar en iterator för utdatabatcherna i stället för en enda utdatabatch.
- Längden på hela utdata i iteratorn ska vara samma som längden på hela indata.
- Den inkapslade Pandas UDF tar en enda Spark-kolumn som indata.
Du bör ange Tips för Python-typ som Iterator[pandas.Series]
->Iterator[pandas.Series]
.
Den här Pandas UDF är användbar när UDF-körningen kräver initiering av något tillstånd, till exempel att läsa in en maskininlärningsmodellfil för att tillämpa slutsatsdragning för varje indatabatch.
I följande exempel visas hur du skapar en Pandas UDF med iteratorstöd.
import pandas as pd
from typing import Iterator
from pyspark.sql.functions import col, pandas_udf, struct
pdf = pd.DataFrame([1, 2, 3], columns=["x"])
df = spark.createDataFrame(pdf)
# When the UDF is called with the column,
# the input to the underlying function is an iterator of pd.Series.
@pandas_udf("long")
def plus_one(batch_iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
for x in batch_iter:
yield x + 1
df.select(plus_one(col("x"))).show()
# +-----------+
# |plus_one(x)|
# +-----------+
# | 2|
# | 3|
# | 4|
# +-----------+
# In the UDF, you can initialize some state before processing batches.
# Wrap your code with try/finally or use context managers to ensure
# the release of resources at the end.
y_bc = spark.sparkContext.broadcast(1)
@pandas_udf("long")
def plus_y(batch_iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
y = y_bc.value # initialize states
try:
for x in batch_iter:
yield x + y
finally:
pass # release resources here, if any
df.select(plus_y(col("x"))).show()
# +---------+
# |plus_y(x)|
# +---------+
# | 2|
# | 3|
# | 4|
# +---------+
Iterator för flera serier till iterator i serie UDF
En iterator för flera serier till iterator i serie UDF har liknande egenskaper och begränsningar som Iterator för serie till iterator i serie UDF. Den angivna funktionen tar en iterator av batchar och matar ut en iterator med batchar. Det är också användbart när UDF-körningen kräver att vissa tillstånd initieras.
Skillnaderna är:
- Den underliggande Python-funktionen tar en iterator av en tupplar av Pandas Series.
- Den omslutna Pandas UDF tar flera Spark-kolumner som indata.
Du anger typtipsen som Iterator[Tuple[pandas.Series, ...]]
->Iterator[pandas.Series]
.
from typing import Iterator, Tuple
import pandas as pd
from pyspark.sql.functions import col, pandas_udf, struct
pdf = pd.DataFrame([1, 2, 3], columns=["x"])
df = spark.createDataFrame(pdf)
@pandas_udf("long")
def multiply_two_cols(
iterator: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[pd.Series]:
for a, b in iterator:
yield a * b
df.select(multiply_two_cols("x", "x")).show()
# +-----------------------+
# |multiply_two_cols(x, x)|
# +-----------------------+
# | 1|
# | 4|
# | 9|
# +-----------------------+
Serie till skalär UDF
Serier till skalär pandas UDF:er liknar Spark-aggregeringsfunktioner.
En serie till skalär Pandas UDF definierar en aggregering från en eller flera Pandas-serier till ett skalärt värde, där varje Pandas-serie representerar en Spark-kolumn.
Du använder en serie för att skala Pandas UDF med API:er som select
, withColumn
, groupBy.agg
och pyspark.sql.Window.
Du uttrycker typtipset som pandas.Series, ...
->Any
. Returtypen ska vara en primitiv datatyp och den returnerade skalären kan vara antingen en Primitiv Python-typ, till exempel int
eller float
en NumPy-datatyp som numpy.int64
eller numpy.float64
.
Any
bör helst vara en specifik skalär typ.
Den här typen av UDF stöder inte partiell aggregering och alla data för varje grupp läses in i minnet.
I följande exempel visas hur du använder den här typen av UDF för att beräkna medelvärdet med select
, groupBy
och window
åtgärder:
import pandas as pd
from pyspark.sql.functions import pandas_udf
from pyspark.sql import Window
df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
("id", "v"))
# Declare the function and create the UDF
@pandas_udf("double")
def mean_udf(v: pd.Series) -> float:
return v.mean()
df.select(mean_udf(df['v'])).show()
# +-----------+
# |mean_udf(v)|
# +-----------+
# | 4.2|
# +-----------+
df.groupby("id").agg(mean_udf(df['v'])).show()
# +---+-----------+
# | id|mean_udf(v)|
# +---+-----------+
# | 1| 1.5|
# | 2| 6.0|
# +---+-----------+
w = Window \
.partitionBy('id') \
.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
df.withColumn('mean_v', mean_udf(df['v']).over(w)).show()
# +---+----+------+
# | id| v|mean_v|
# +---+----+------+
# | 1| 1.0| 1.5|
# | 1| 2.0| 1.5|
# | 2| 3.0| 6.0|
# | 2| 5.0| 6.0|
# | 2|10.0| 6.0|
# +---+----+------+
Detaljerad användning finns i pyspark.sql.functions.pandas_udf.
Förbrukning
Ange batchstorlek för pil
Kommentar
Den här konfigurationen påverkar inte beräkning som konfigurerats med läget för delad åtkomst och Databricks Runtime 13.3 LTS till och med 14.2.
Datapartitioner i Spark konverteras till Pilpostbatch, vilket tillfälligt kan leda till hög minnesanvändning i JVM. För att undvika möjliga undantag från minnet kan du justera storleken på pilpostbatcherna genom att ställa in konfigurationen spark.sql.execution.arrow.maxRecordsPerBatch
på ett heltal som avgör det maximala antalet rader för varje batch. Standardvärdet är 10 000 poster per batch. Om antalet kolumner är stort bör värdet justeras i enlighet med detta. Med denna gräns delas varje datapartition in i ett eller flera postpaket för bearbetning.
Tidsstämpel med tidszonssemantik
Spark lagrar internt tidsstämplar som UTC-värden, och tidsstämpeldata som hämtas utan en angiven tidszon konverteras som lokal tid till UTC med mikrosekundersupplösning.
När tidsstämpeldata exporteras eller visas i Spark används sessionstidszonen för att lokalisera tidsstämpelvärdena. Sessionens tidszon anges med spark.sql.session.timeZone
konfiguration och är standard för den lokala tidszonen för JVM-systemet. Pandas använder en datetime64
typ med nanosekundersupplösning, datetime64[ns]
, med valfri tidszon per kolumn.
När tidsstämpeldata överförs från Spark till Pandas konverteras de till nanosekunder och varje kolumn konverteras till tidszonen för Spark-sessionen och lokaliseras sedan till tidszonen, vilket tar bort tidszonen och visar värden som lokal tid. Detta inträffar när du anropar toPandas()
eller pandas_udf
med tidsstämpelkolumner.
När tidsstämpeldata överförs från Pandas till Spark konverteras de till UTC-mikrosekunder. Detta inträffar när du anropar createDataFrame
med en Pandas DataFrame eller när du returnerar en tidsstämpel från en Pandas UDF. Dessa konverteringar görs automatiskt för att säkerställa att Spark har data i förväntat format, så det är inte nödvändigt att göra någon av dessa konverteringar själv. Eventuella nanosekundvärden trunkeras.
En standard-UDF läser in tidsstämpeldata som Python datetime-objekt, vilket skiljer sig från en Pandas-tidsstämpel. För att få bästa prestanda rekommenderar vi att du använder pandas tidsseriefunktioner när du arbetar med tidsstämplar i en Pandas UDF. Mer information finns i Time Series/Date-funktioner.
Exempelnotebook-fil
Följande notebook-fil illustrerar de prestandaförbättringar som du kan uppnå med Pandas UDF:er: