Поделиться через


Определяемые пользователем скалярные функции — Python

В этой статье приведены примеры определяемых пользователем функций Python. В нем показано, как зарегистрировать определяемые пользователем функции, как вызвать определяемые пользователем функции и предоставить предостережения о порядке вычисления вложенных выражений в Spark SQL.

В Databricks Runtime 14.0 и более поздних версиях можно использовать определяемые пользователем функции таблиц Python для регистрации функций, возвращающих все отношения вместо скалярных значений. См. определяемые пользователем табличные функции Python (UDTFs).

Примечание.

В Databricks Runtime 12.2 LTS и ниже, функции, определяемые пользователем (UDF) Python, и UDF Pandas не поддерживаются на вычислительных ресурсах каталога Unity, работающих в режиме общего доступа. Скалярные ОПРЕДЕЛяемые пользователем Python и Pandas поддерживаются в Databricks Runtime 13.3 LTS и более поздних версиях для всех режимов доступа.

В Databricks Runtime 13.3 LTS и более поздних версиях можно зарегистрировать скалярные пользовательские файлы Python в каталоге Unity с помощью синтаксиса SQL. См. функции, определяемые пользователем (UDF), в каталоге Unity.

Регистрация определяемой пользователем функции

def squared(s):
  return s * s
spark.udf.register("squaredWithPython", squared)

При необходимости можно задать тип возвращаемого объекта UDF. По умолчанию это StringType.

from pyspark.sql.types import LongType
def squared_typed(s):
  return s * s
spark.udf.register("squaredWithPython", squared_typed, LongType())

Вызов определяемой пользователем функции в Spark SQL

spark.range(1, 20).createOrReplaceTempView("test")
%sql select id, squaredWithPython(id) as id_squared from test

Использование определяемой пользователем функции с кадрами данных

from pyspark.sql.functions import udf
from pyspark.sql.types import LongType
squared_udf = udf(squared, LongType())
df = spark.table("test")
display(df.select("id", squared_udf("id").alias("id_squared")))

Вы также можете объявить определяемую пользователем функцию с помощью синтаксиса аннотации:

from pyspark.sql.functions import udf
@udf("long")
def squared_udf(s):
  return s * s
df = spark.table("test")
display(df.select("id", squared_udf("id").alias("id_squared")))

Порядок вычисления и проверка значений NULL

В SQL Spark (включая SQL, а также API кадров и наборов данных) не гарантируется определенный порядок вычисления частей выражения. В частности, входные данные оператора или функции не обязательно вычисляются слева направо или в любом другом фиксированном порядке. Например, логические выражения AND и OR не имеют привычной семантики слева направо.

Таким образом, не следует полагаться на побочные эффекты или порядок вычисления логических выражений, а также порядок предложений WHERE и HAVING, так как этот порядок и правила применения предложений могут изменяться в результате оптимизации или планирования запросов. В частности, если определяемая пользователем функция использует привычную семантику в SQL для проверки значений NULL, нет гарантии того, что эта проверка произойдет перед вызовом функции. Например,

spark.udf.register("strlen", lambda s: len(s), "int")
spark.sql("select s from test1 where s is not null and strlen(s) > 1") # no guarantee

В этом примере выражение WHERE не гарантирует, что определяемая пользователем функция strlen будет вызываться после фильтрации значений NULL.

Для правильной проверки значений NULL рекомендуется выполнить одно из следующих действий:

  • Реализуйте в функции поддержку значения NULL и выполняйте проверку этих значений внутри самой функции.
  • Используйте выражения IF или CASE WHEN для проверки значения NULL и вызова определяемой пользователем функции в условной ветви.
spark.udf.register("strlen_nullsafe", lambda s: len(s) if not s is None else -1, "int")
spark.sql("select s from test1 where s is not null and strlen_nullsafe(s) > 1") // ok
spark.sql("select s from test1 where if(s is not null, strlen(s), null) > 1")   // ok

Ограничения

Следующие ограничения применяются к UDF PySpark.

  • ограничения на импорт модуля: UDF PySpark в общих кластерах и бессерверных вычислительных средах не могут получить доступ к папкам Git, файлам рабочей области или томам Unity Catalog для импорта модулей в Databricks Runtime версий 14.2 и ниже.

  • переменные широковещательной трансляции: Определяемые пользователем функции в PySpark на общих кластерах и бессерверных вычислительных ресурсах не поддерживают широковещательные переменные.

  • ограничение памяти: для пользовательских функций PySpark на платформе бессерверных вычислений установлено ограничение памяти 1 ГБ на каждую PySpark UDF. Превышение этого ограничения приводит к следующей ошибке: [UDF_PYSPARK_ERROR.OOM] Python worker exited unexpectedly (crashed) due to running out of memory.