Funkcje biblioteki Pandas zdefiniowane przez użytkownika
Funkcja zdefiniowana przez użytkownika (UDF) biblioteki pandas ( znana również jako wektoryzowana funkcja UDF) to funkcja zdefiniowana przez użytkownika, która używa narzędzia Apache Arrow do przesyłania danych i biblioteki pandas do pracy z danymi. Funkcje zdefiniowane przez użytkownika biblioteki pandas umożliwiają wektoryzowane operacje, które mogą zwiększyć wydajność do 100 razy w porównaniu z funkcjami zdefiniowanymi przez użytkownika języka Python w wierszu w czasie.
Aby uzyskać podstawowe informacje, zobacz wpis w blogu New Pandas UDFs i Python Type Hints w nadchodzącej wersji platformy Apache Spark 3.0.
Definiowanie funkcji zdefiniowanej przez użytkownika biblioteki pandas przy użyciu słowa kluczowego pandas_udf
jako dekoratora i zawijanie funkcji za pomocą wskazówki typu języka Python.
W tym artykule opisano różne typy funkcji zdefiniowanych przez użytkownika biblioteki pandas i pokazano, jak używać funkcji zdefiniowanych przez użytkownika biblioteki pandas z wskazówkami dotyczącymi typów.
Seria do serii UDF
Funkcja UDF serii służy do wektorowania operacji skalarnych przy użyciu serii pandas.
Można ich używać z interfejsami API, takimi jak select
i withColumn
.
Funkcja języka Python powinna przyjmować serię pandas jako dane wejściowe i zwracać serię pandas o tej samej długości i należy je określić w wskazówkach dotyczących typu języka Python. Platforma Spark uruchamia funkcję UDF biblioteki pandas, dzieląc kolumny na partie, wywołując funkcję dla każdej partii jako podzbiór danych, a następnie łącząc wyniki.
W poniższym przykładzie pokazano, jak utworzyć funkcję zdefiniowanej przez użytkownika biblioteki pandas, która oblicza produkt 2 kolumn.
import pandas as pd
from pyspark.sql.functions import col, pandas_udf
from pyspark.sql.types import LongType
# Declare the function and create the UDF
def multiply_func(a: pd.Series, b: pd.Series) -> pd.Series:
return a * b
multiply = pandas_udf(multiply_func, returnType=LongType())
# The function for a pandas_udf should be able to execute with local pandas data
x = pd.Series([1, 2, 3])
print(multiply_func(x, x))
# 0 1
# 1 4
# 2 9
# dtype: int64
# Create a Spark DataFrame, 'spark' is an existing SparkSession
df = spark.createDataFrame(pd.DataFrame(x, columns=["x"]))
# Execute function as a Spark vectorized UDF
df.select(multiply(col("x"), col("x"))).show()
# +-------------------+
# |multiply_func(x, x)|
# +-------------------+
# | 1|
# | 4|
# | 9|
# +-------------------+
Iterator serii do iteratora funkcji zdefiniowanej przez użytkownika serii
Funkcja UDF iteratora jest taka sama jak funkcja zdefiniowanej przez użytkownika biblioteki pandas skalarnej z wyjątkiem:
- Funkcja języka Python
- Przyjmuje iterator partii zamiast pojedynczej partii wejściowej jako dane wejściowe.
- Zwraca iterator partii wyjściowych zamiast pojedynczej partii wyjściowej.
- Długość całych danych wyjściowych w iteratorze powinna być taka sama jak długość całego wejścia.
- Opakowana funkcja UDF biblioteki pandas przyjmuje pojedynczą kolumnę platformy Spark jako dane wejściowe.
Należy określić wskazówkę typu języka Python jako Iterator[pandas.Series]
->Iterator[pandas.Series]
.
Ta funkcja zdefiniowanej przez użytkownika biblioteki pandas jest przydatna, gdy wykonanie funkcji zdefiniowanej przez użytkownika wymaga zainicjowania określonego stanu, na przykład załadowania pliku modelu uczenia maszynowego w celu zastosowania wnioskowania do każdej partii wejściowej.
W poniższym przykładzie pokazano, jak utworzyć funkcję zdefiniowanej przez użytkownika biblioteki pandas z obsługą iteratora.
import pandas as pd
from typing import Iterator
from pyspark.sql.functions import col, pandas_udf, struct
pdf = pd.DataFrame([1, 2, 3], columns=["x"])
df = spark.createDataFrame(pdf)
# When the UDF is called with the column,
# the input to the underlying function is an iterator of pd.Series.
@pandas_udf("long")
def plus_one(batch_iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
for x in batch_iter:
yield x + 1
df.select(plus_one(col("x"))).show()
# +-----------+
# |plus_one(x)|
# +-----------+
# | 2|
# | 3|
# | 4|
# +-----------+
# In the UDF, you can initialize some state before processing batches.
# Wrap your code with try/finally or use context managers to ensure
# the release of resources at the end.
y_bc = spark.sparkContext.broadcast(1)
@pandas_udf("long")
def plus_y(batch_iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
y = y_bc.value # initialize states
try:
for x in batch_iter:
yield x + y
finally:
pass # release resources here, if any
df.select(plus_y(col("x"))).show()
# +---------+
# |plus_y(x)|
# +---------+
# | 2|
# | 3|
# | 4|
# +---------+
Iterator wielu serii do iteratora funkcji UDF serii
Iterator wielu serii do iteratora funkcji UDF serii ma podobne cechy i ograniczenia jak iterator serii do iteratora funkcji UDF serii. Określona funkcja przyjmuje iterator partii i generuje iterator partii. Jest to również przydatne, gdy wykonanie funkcji zdefiniowanej przez użytkownika wymaga zainicjowania określonego stanu.
Różnice są następujące:
- Podstawowa funkcja języka Python przyjmuje iterator krotki serii pandas.
- Opakowana funkcja UDF biblioteki pandas przyjmuje wiele kolumn platformy Spark jako dane wejściowe.
Należy określić wskazówki dotyczące typu jako Iterator[Tuple[pandas.Series, ...]]
->Iterator[pandas.Series]
.
from typing import Iterator, Tuple
import pandas as pd
from pyspark.sql.functions import col, pandas_udf, struct
pdf = pd.DataFrame([1, 2, 3], columns=["x"])
df = spark.createDataFrame(pdf)
@pandas_udf("long")
def multiply_two_cols(
iterator: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[pd.Series]:
for a, b in iterator:
yield a * b
df.select(multiply_two_cols("x", "x")).show()
# +-----------------------+
# |multiply_two_cols(x, x)|
# +-----------------------+
# | 1|
# | 4|
# | 9|
# +-----------------------+
Serie do skalarnych funkcji zdefiniowanej przez użytkownika
Serie do skalarnych funkcji zdefiniowanych przez użytkownika biblioteki pandas są podobne do funkcji agregujących platformy Spark.
Funkcja UDF serii do skalarnej biblioteki pandas definiuje agregację z co najmniej jednej serii pandas do wartości skalarnej, gdzie każda seria pandas reprezentuje kolumnę platformy Spark.
Używasz serii do skalowania biblioteki pandas UDF z interfejsami API, takimi jak select
, withColumn
, groupBy.agg
i pyspark.sql.Window.
Należy wyrazić wskazówkę typu jako pandas.Series, ...
->Any
. Zwracany typ powinien być typem danych pierwotnych, a zwracany skalar może być typem pierwotnym języka Python, na przykład lub typem danych NumPy, int
takim jak numpy.int64
lub float
numpy.float64
. Any
najlepiej być określonym typem skalarnym.
Ten typ funkcji zdefiniowanej przez użytkownika nie obsługuje częściowej agregacji, a wszystkie dane dla każdej grupy są ładowane do pamięci.
W poniższym przykładzie pokazano, jak używać tego typu funkcji zdefiniowanej przez użytkownika do obliczania średniej z operacjami select
, i window
: groupBy
import pandas as pd
from pyspark.sql.functions import pandas_udf
from pyspark.sql import Window
df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
("id", "v"))
# Declare the function and create the UDF
@pandas_udf("double")
def mean_udf(v: pd.Series) -> float:
return v.mean()
df.select(mean_udf(df['v'])).show()
# +-----------+
# |mean_udf(v)|
# +-----------+
# | 4.2|
# +-----------+
df.groupby("id").agg(mean_udf(df['v'])).show()
# +---+-----------+
# | id|mean_udf(v)|
# +---+-----------+
# | 1| 1.5|
# | 2| 6.0|
# +---+-----------+
w = Window \
.partitionBy('id') \
.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
df.withColumn('mean_v', mean_udf(df['v']).over(w)).show()
# +---+----+------+
# | id| v|mean_v|
# +---+----+------+
# | 1| 1.0| 1.5|
# | 1| 2.0| 1.5|
# | 2| 3.0| 6.0|
# | 2| 5.0| 6.0|
# | 2|10.0| 6.0|
# +---+----+------+
Aby uzyskać szczegółowe informacje o użyciu, zobacz pyspark.sql.functions.pandas_udf.
Użycie
Ustawianie rozmiaru partii strzałki
Uwaga
Ta konfiguracja nie ma wpływu na obliczenia skonfigurowane przy użyciu trybu dostępu współdzielonego i środowiska Databricks Runtime 13.3 LTS do wersji 14.2.
Partycje danych na platformie Spark są konwertowane na partie rekordów strzałki, co może tymczasowo prowadzić do wysokiego użycia pamięci w maszynie JVM. Aby uniknąć możliwych wyjątków braku pamięci, można dostosować rozmiar partii rekordów strzałki, ustawiając spark.sql.execution.arrow.maxRecordsPerBatch
konfigurację na liczbę całkowitą, która określa maksymalną liczbę wierszy dla każdej partii. Wartość domyślna to 10 000 rekordów na partię. Jeśli liczba kolumn jest duża, należy odpowiednio dostosować wartość. Korzystając z tego limitu, każda partycja danych jest podzielona na co najmniej 1 partie rekordów do przetwarzania.
Sygnatura czasowa z semantyka stref czasowych
Platforma Spark wewnętrznie przechowuje znaczniki czasu jako wartości UTC, a dane sygnatury czasowej wprowadzone bez określonej strefy czasowej są konwertowane jako czas lokalny na czas UTC z mikrosekundową rozdzielczością.
Gdy dane sygnatury czasowej są eksportowane lub wyświetlane na platformie Spark, strefa czasowa sesji służy do lokalizowania wartości znacznika czasu. Strefa czasowa sesji jest ustawiana z konfiguracją spark.sql.session.timeZone
i domyślnie ustawiona na lokalną strefę czasową systemu JVM. Biblioteka pandas używa datetime64
typu z rozdzielczością nanosekund, datetime64[ns]
, z opcjonalną strefą czasową na podstawie kolumny.
Gdy dane sygnatury czasowej są przesyłane z platformy Spark do biblioteki pandas, są konwertowane na nanosekundy, a każda kolumna jest konwertowana na strefę czasową sesji platformy Spark, a następnie zlokalizowana w tej strefie czasowej, która usuwa strefę czasową i wyświetla wartości jako czas lokalny. Dzieje się tak podczas wywoływania toPandas()
lub pandas_udf
z kolumnami sygnatury czasowej.
Gdy dane sygnatury czasowej są przesyłane z biblioteki pandas do platformy Spark, są konwertowane na mikrosekundy UTC. Dzieje się tak w przypadku wywoływania createDataFrame
za pomocą ramki danych pandas lub zwracania znacznika czasu z funkcji zdefiniowanej przez użytkownika biblioteki pandas. Te konwersje są wykonywane automatycznie, aby upewnić się, że platforma Spark ma dane w oczekiwanym formacie, więc nie jest konieczne samodzielne wykonanie żadnej z tych konwersji. Wszystkie nanosekundy są obcinane.
Standardowa funkcja UDF ładuje dane sygnatury czasowej jako obiekty daty/godziny języka Python, które różnią się od sygnatury czasowej biblioteki pandas. Aby uzyskać najlepszą wydajność, zalecamy używanie funkcji szeregów czasowych biblioteki pandas podczas pracy ze znacznikami czasu w funkcji zdefiniowanej przez użytkownika biblioteki pandas. Aby uzyskać szczegółowe informacje, zobacz Funkcje szeregów czasowych/dat.
Przykładowy notes
W poniższym notesie przedstawiono ulepszenia wydajności, które można osiągnąć za pomocą funkcji zdefiniowanych przez użytkownika biblioteki pandas: