Funkcje skalarne zdefiniowane przez użytkownika — Python
Ten artykuł zawiera przykłady funkcji zdefiniowanej przez użytkownika języka Python (UDF). Przedstawia on sposób rejestrowania funkcji zdefiniowanych przez użytkownika, wywoływania funkcji zdefiniowanych przez użytkownika i zapewnia zastrzeżenia dotyczące kolejności oceny podexpressionów w usłudze Spark SQL.
W środowisku Databricks Runtime 14.0 lub nowszym można użyć funkcji tabeli zdefiniowanej przez użytkownika języka Python (UDTFs), aby zarejestrować funkcje zwracające całe relacje zamiast wartości skalarnych. Zobacz funkcje tabeli zdefiniowane przez użytkownika w języku Python (UDTFs).
Uwaga
W środowisku Databricks Runtime 12.2 LTS i starszym funkcje UDF języka Python i Pandas nie są obsługiwane w obliczeniach katalogu Unity korzystających z trybu współdzielonego dostępu. Funkcje zdefiniowane przez użytkownika języka Python i funkcje zdefiniowane przez użytkownika biblioteki Pandas są obsługiwane w środowisku Databricks Runtime 13.3 LTS i nowszym dla wszystkich trybów dostępu.
W środowisku Databricks Runtime 13.3 LTS i nowszym można zarejestrować skalarne funkcje Pythona zdefiniowane przez użytkownika (UDF) w Unity Catalog przy użyciu składni SQL. Patrz funkcje zdefiniowane przez użytkownika (UDF) w Unity Catalog.
Rejestrowanie funkcji jako funkcji zdefiniowanej przez użytkownika
def squared(s):
return s * s
spark.udf.register("squaredWithPython", squared)
Opcjonalnie można ustawić typ zwracany funkcji zdefiniowanej przez użytkownika. Domyślnym typem zwracania jest StringType
.
from pyspark.sql.types import LongType
def squared_typed(s):
return s * s
spark.udf.register("squaredWithPython", squared_typed, LongType())
Wywoływanie funkcji zdefiniowanej przez użytkownika w usłudze Spark SQL
spark.range(1, 20).createOrReplaceTempView("test")
%sql select id, squaredWithPython(id) as id_squared from test
Używanie funkcji UDF z ramkami danych
from pyspark.sql.functions import udf
from pyspark.sql.types import LongType
squared_udf = udf(squared, LongType())
df = spark.table("test")
display(df.select("id", squared_udf("id").alias("id_squared")))
Alternatywnie można zadeklarować tę samą funkcję zdefiniowaną przez użytkownika przy użyciu składni adnotacji:
from pyspark.sql.functions import udf
@udf("long")
def squared_udf(s):
return s * s
df = spark.table("test")
display(df.select("id", squared_udf("id").alias("id_squared")))
Kolejność oceny i sprawdzanie wartości null
Usługa Spark SQL (w tym sql i interfejs API elementu DataFrame i zestawu danych) nie gwarantuje kolejności oceny podexpressionów. W szczególności dane wejściowe operatora lub funkcji nie muszą być oceniane od lewej do prawej ani w innej stałej kolejności. Na przykład wyrażenia logiczne AND
i OR
nie mają semantyki od lewej do prawej "zwarcie".
W związku z tym niebezpieczne jest poleganie na skutkach ubocznych lub kolejności obliczania wyrażeń logicznych oraz kolejności WHERE
klauzul i HAVING
, ponieważ takie wyrażenia i klauzule można zmienić kolejność podczas optymalizacji zapytań i planowania. W szczególności jeśli funkcja UDF opiera się na semantyce zwarciowej w języku SQL w celu sprawdzania wartości null, nie ma gwarancji, że sprawdzanie wartości null nastąpi przed wywołaniem funkcji zdefiniowanej przez użytkownika. Na przykład:
spark.udf.register("strlen", lambda s: len(s), "int")
spark.sql("select s from test1 where s is not null and strlen(s) > 1") # no guarantee
Ta WHERE
klauzula nie gwarantuje wywołania funkcji zdefiniowanej strlen
przez użytkownika po odfiltrowaniu wartości null.
Aby wykonać odpowiednie sprawdzanie wartości null, zalecamy wykonanie jednej z następujących czynności:
- Upewnij się, że funkcja UDF jest świadoma wartości null i sprawdza wartość null wewnątrz samej funkcji zdefiniowanej przez użytkownika
- Używanie
IF
wyrażeń lubCASE WHEN
do sprawdzania wartości null i wywoływanie funkcji zdefiniowanej przez użytkownika w gałęzi warunkowej
spark.udf.register("strlen_nullsafe", lambda s: len(s) if not s is None else -1, "int")
spark.sql("select s from test1 where s is not null and strlen_nullsafe(s) > 1") // ok
spark.sql("select s from test1 where if(s is not null, strlen(s), null) > 1") // ok
Ograniczenia
Następujące ograniczenia dotyczą UDF-ów PySpark:
Ograniczenia importowania modułu: UDF PySpark w udostępnionych klastrach i obliczeniach bezserwerowych nie mogą uzyskać dostępu do folderów Git, plików obszarów roboczych ani woluminów Katalogu Unity w celu zaimportowania modułów w środowisku Databricks Runtime 14.2 i wcześniejszych.
Zmienne rozgłoszeniowe: funkcje zdefiniowane przez użytkownika PySpark nie obsługują zmiennych rozgłoszeniowych na udostępnionych klastrach i w obliczeniach bezserwerowych.
limit pamięci: funkcje UDF PySpark w obliczeniach bezserwerowych mają limit pamięci 1 GB każda. Przekroczenie tego limitu powoduje następujący błąd:
[UDF_PYSPARK_ERROR.OOM] Python worker exited unexpectedly (crashed) due to running out of memory.