Funkcje skalarne zdefiniowane przez użytkownika — Python
Ten artykuł zawiera przykłady funkcji zdefiniowanej przez użytkownika języka Python (UDF). Przedstawia on sposób rejestrowania funkcji zdefiniowanych przez użytkownika, wywoływania funkcji zdefiniowanych przez użytkownika i zapewnia zastrzeżenia dotyczące kolejności oceny podexpressionów w usłudze Spark SQL.
W środowisku Databricks Runtime 14.0 lub nowszym można użyć funkcji tabeli zdefiniowanej przez użytkownika języka Python (UDTFs), aby zarejestrować funkcje zwracające całe relacje zamiast wartości skalarnych. Zobacz Funkcje tabeli zdefiniowane przez użytkownika w języku Python (UDTFs).
Uwaga
W środowisku Databricks Runtime 12.2 LTS i nowszym funkcje zdefiniowane przez użytkownika języka Python i funkcje zdefiniowane przez użytkownika biblioteki Pandas nie są obsługiwane w obliczeniach wykazu aparatu Unity korzystających z trybu dostępu współdzielonego. Funkcje zdefiniowane przez użytkownika języka Python i funkcje zdefiniowane przez użytkownika biblioteki Pandas są obsługiwane w środowisku Databricks Runtime 13.3 LTS i nowszym dla wszystkich trybów dostępu.
W środowisku Databricks Runtime 13.3 LTS i nowszym można zarejestrować scalarne funkcje zdefiniowane przez użytkownika języka Python w katalogu aparatu Unity przy użyciu składni JĘZYKA SQL. Zobacz Funkcje zdefiniowane przez użytkownika (UDF) w wykazie aparatu Unity.
Rejestrowanie funkcji jako funkcji zdefiniowanej przez użytkownika
def squared(s):
return s * s
spark.udf.register("squaredWithPython", squared)
Opcjonalnie można ustawić zwracany typ funkcji zdefiniowanej przez użytkownika. Domyślnym typem zwracania jest StringType
.
from pyspark.sql.types import LongType
def squared_typed(s):
return s * s
spark.udf.register("squaredWithPython", squared_typed, LongType())
Wywoływanie funkcji zdefiniowanej przez użytkownika w usłudze Spark SQL
spark.range(1, 20).createOrReplaceTempView("test")
%sql select id, squaredWithPython(id) as id_squared from test
Używanie funkcji UDF z ramkami danych
from pyspark.sql.functions import udf
from pyspark.sql.types import LongType
squared_udf = udf(squared, LongType())
df = spark.table("test")
display(df.select("id", squared_udf("id").alias("id_squared")))
Alternatywnie można zadeklarować tę samą funkcję zdefiniowaną przez użytkownika przy użyciu składni adnotacji:
from pyspark.sql.functions import udf
@udf("long")
def squared_udf(s):
return s * s
df = spark.table("test")
display(df.select("id", squared_udf("id").alias("id_squared")))
Kolejność oceny i sprawdzanie wartości null
Usługa Spark SQL (w tym sql i interfejs API elementu DataFrame i zestawu danych) nie gwarantuje kolejności oceny podexpressionów. W szczególności dane wejściowe operatora lub funkcji nie muszą być oceniane od lewej do prawej ani w innej stałej kolejności. Na przykład wyrażenia logiczne AND
i OR
nie mają semantyki od lewej do prawej "zwarcie".
W związku z tym niebezpieczne jest poleganie na skutkach ubocznych lub kolejności obliczania wyrażeń logicznych oraz kolejności WHERE
klauzul i HAVING
, ponieważ takie wyrażenia i klauzule można zmienić kolejność podczas optymalizacji zapytań i planowania. W szczególności jeśli funkcja UDF opiera się na semantyce zwarciowej w języku SQL w celu sprawdzania wartości null, nie ma gwarancji, że sprawdzanie wartości null nastąpi przed wywołaniem funkcji zdefiniowanej przez użytkownika. Na przykład:
spark.udf.register("strlen", lambda s: len(s), "int")
spark.sql("select s from test1 where s is not null and strlen(s) > 1") # no guarantee
Ta WHERE
klauzula nie gwarantuje wywołania funkcji zdefiniowanej strlen
przez użytkownika po odfiltrowaniu wartości null.
Aby wykonać odpowiednie sprawdzanie wartości null, zalecamy wykonanie jednej z następujących czynności:
- Upewnij się, że funkcja UDF jest świadoma wartości null i sprawdza wartość null wewnątrz samej funkcji zdefiniowanej przez użytkownika
- Używanie
IF
wyrażeń lubCASE WHEN
do sprawdzania wartości null i wywoływanie funkcji zdefiniowanej przez użytkownika w gałęzi warunkowej
spark.udf.register("strlen_nullsafe", lambda s: len(s) if not s is None else -1, "int")
spark.sql("select s from test1 where s is not null and strlen_nullsafe(s) > 1") // ok
spark.sql("select s from test1 where if(s is not null, strlen(s), null) > 1") // ok
Ograniczenia
- Funkcje zdefiniowane przez użytkownika PySpark w udostępnionych klastrach lub bezserwerowych obiektach obliczeniowych nie mogą uzyskać dostępu do folderów Git, plików obszaru roboczego lub woluminów UC w celu zaimportowania modułów w środowisku Databricks Runtime 14.2 lub nowszym.