Udostępnij za pośrednictwem


Funkcje skalarne zdefiniowane przez użytkownika — Python

Ten artykuł zawiera przykłady funkcji zdefiniowanej przez użytkownika języka Python (UDF). Przedstawia on sposób rejestrowania funkcji zdefiniowanych przez użytkownika, wywoływania funkcji zdefiniowanych przez użytkownika i zapewnia zastrzeżenia dotyczące kolejności oceny podexpressionów w usłudze Spark SQL.

W środowisku Databricks Runtime 14.0 lub nowszym można użyć funkcji tabeli zdefiniowanej przez użytkownika języka Python (UDTFs), aby zarejestrować funkcje zwracające całe relacje zamiast wartości skalarnych. Zobacz Funkcje tabeli zdefiniowane przez użytkownika w języku Python (UDTFs).

Uwaga

W środowisku Databricks Runtime 12.2 LTS i nowszym funkcje zdefiniowane przez użytkownika języka Python i funkcje zdefiniowane przez użytkownika biblioteki Pandas nie są obsługiwane w obliczeniach wykazu aparatu Unity korzystających z trybu dostępu współdzielonego. Funkcje zdefiniowane przez użytkownika języka Python i funkcje zdefiniowane przez użytkownika biblioteki Pandas są obsługiwane w środowisku Databricks Runtime 13.3 LTS i nowszym dla wszystkich trybów dostępu.

W środowisku Databricks Runtime 13.3 LTS i nowszym można zarejestrować scalarne funkcje zdefiniowane przez użytkownika języka Python w katalogu aparatu Unity przy użyciu składni JĘZYKA SQL. Zobacz Funkcje zdefiniowane przez użytkownika (UDF) w wykazie aparatu Unity.

Rejestrowanie funkcji jako funkcji zdefiniowanej przez użytkownika

def squared(s):
  return s * s
spark.udf.register("squaredWithPython", squared)

Opcjonalnie można ustawić zwracany typ funkcji zdefiniowanej przez użytkownika. Domyślnym typem zwracania jest StringType.

from pyspark.sql.types import LongType
def squared_typed(s):
  return s * s
spark.udf.register("squaredWithPython", squared_typed, LongType())

Wywoływanie funkcji zdefiniowanej przez użytkownika w usłudze Spark SQL

spark.range(1, 20).createOrReplaceTempView("test")
%sql select id, squaredWithPython(id) as id_squared from test

Używanie funkcji UDF z ramkami danych

from pyspark.sql.functions import udf
from pyspark.sql.types import LongType
squared_udf = udf(squared, LongType())
df = spark.table("test")
display(df.select("id", squared_udf("id").alias("id_squared")))

Alternatywnie można zadeklarować tę samą funkcję zdefiniowaną przez użytkownika przy użyciu składni adnotacji:

from pyspark.sql.functions import udf
@udf("long")
def squared_udf(s):
  return s * s
df = spark.table("test")
display(df.select("id", squared_udf("id").alias("id_squared")))

Kolejność oceny i sprawdzanie wartości null

Usługa Spark SQL (w tym sql i interfejs API elementu DataFrame i zestawu danych) nie gwarantuje kolejności oceny podexpressionów. W szczególności dane wejściowe operatora lub funkcji nie muszą być oceniane od lewej do prawej ani w innej stałej kolejności. Na przykład wyrażenia logiczne AND i OR nie mają semantyki od lewej do prawej "zwarcie".

W związku z tym niebezpieczne jest poleganie na skutkach ubocznych lub kolejności obliczania wyrażeń logicznych oraz kolejności WHERE klauzul i HAVING , ponieważ takie wyrażenia i klauzule można zmienić kolejność podczas optymalizacji zapytań i planowania. W szczególności jeśli funkcja UDF opiera się na semantyce zwarciowej w języku SQL w celu sprawdzania wartości null, nie ma gwarancji, że sprawdzanie wartości null nastąpi przed wywołaniem funkcji zdefiniowanej przez użytkownika. Na przykład:

spark.udf.register("strlen", lambda s: len(s), "int")
spark.sql("select s from test1 where s is not null and strlen(s) > 1") # no guarantee

Ta WHERE klauzula nie gwarantuje wywołania funkcji zdefiniowanej strlen przez użytkownika po odfiltrowaniu wartości null.

Aby wykonać odpowiednie sprawdzanie wartości null, zalecamy wykonanie jednej z następujących czynności:

  • Upewnij się, że funkcja UDF jest świadoma wartości null i sprawdza wartość null wewnątrz samej funkcji zdefiniowanej przez użytkownika
  • Używanie IF wyrażeń lub CASE WHEN do sprawdzania wartości null i wywoływanie funkcji zdefiniowanej przez użytkownika w gałęzi warunkowej
spark.udf.register("strlen_nullsafe", lambda s: len(s) if not s is None else -1, "int")
spark.sql("select s from test1 where s is not null and strlen_nullsafe(s) > 1") // ok
spark.sql("select s from test1 where if(s is not null, strlen(s), null) > 1")   // ok

Ograniczenia

  • Funkcje zdefiniowane przez użytkownika PySpark w udostępnionych klastrach lub bezserwerowych obiektach obliczeniowych nie mogą uzyskać dostępu do folderów Git, plików obszaru roboczego lub woluminów UC w celu zaimportowania modułów w środowisku Databricks Runtime 14.2 lub nowszym.