แชร์ผ่าน


User-defined scalar functions - Python

This article contains Python user-defined function (UDF) examples. It shows how to register UDFs, how to invoke UDFs, and provides caveats about evaluation order of subexpressions in Spark SQL.

In Databricks Runtime 14.0 and above, you can use Python user-defined table functions (UDTFs) to register functions that return entire relations instead of scalar values. See Python user-defined table functions (UDTFs).

Note

In Databricks Runtime 12.2 LTS and below, Python UDFs and Pandas UDFs are not supported on Unity Catalog compute that uses shared access mode. Scalar Python UDFs and Pandas UDFs are supported in Databricks Runtime 13.3 LTS and above for all access modes.

In Databricks Runtime 13.3 LTS and above, you can register scalar Python UDFs to Unity Catalog using SQL syntax. See User-defined functions (UDFs) in Unity Catalog.

Register a function as a UDF

def squared(s):
  return s * s
spark.udf.register("squaredWithPython", squared)

You can optionally set the return type of your UDF. The default return type is StringType.

from pyspark.sql.types import LongType
def squared_typed(s):
  return s * s
spark.udf.register("squaredWithPython", squared_typed, LongType())

Call the UDF in Spark SQL

spark.range(1, 20).createOrReplaceTempView("test")
%sql select id, squaredWithPython(id) as id_squared from test

Use UDF with DataFrames

from pyspark.sql.functions import udf
from pyspark.sql.types import LongType
squared_udf = udf(squared, LongType())
df = spark.table("test")
display(df.select("id", squared_udf("id").alias("id_squared")))

Alternatively, you can declare the same UDF using annotation syntax:

from pyspark.sql.functions import udf
@udf("long")
def squared_udf(s):
  return s * s
df = spark.table("test")
display(df.select("id", squared_udf("id").alias("id_squared")))

Evaluation order and null checking

Spark SQL (including SQL and the DataFrame and Dataset API) does not guarantee the order of evaluation of subexpressions. In particular, the inputs of an operator or function are not necessarily evaluated left-to-right or in any other fixed order. For example, logical AND and OR expressions do not have left-to-right “short-circuiting” semantics.

Therefore, it is dangerous to rely on the side effects or order of evaluation of Boolean expressions, and the order of WHERE and HAVING clauses, since such expressions and clauses can be reordered during query optimization and planning. Specifically, if a UDF relies on short-circuiting semantics in SQL for null checking, there’s no guarantee that the null check will happen before invoking the UDF. For example,

spark.udf.register("strlen", lambda s: len(s), "int")
spark.sql("select s from test1 where s is not null and strlen(s) > 1") # no guarantee

This WHERE clause does not guarantee the strlen UDF to be invoked after filtering out nulls.

To perform proper null checking, we recommend that you do either of the following:

  • Make the UDF itself null-aware and do null checking inside the UDF itself
  • Use IF or CASE WHEN expressions to do the null check and invoke the UDF in a conditional branch
spark.udf.register("strlen_nullsafe", lambda s: len(s) if not s is None else -1, "int")
spark.sql("select s from test1 where s is not null and strlen_nullsafe(s) > 1") // ok
spark.sql("select s from test1 where if(s is not null, strlen(s), null) > 1")   // ok

Limitations

The following limitations apply to PySpark UDFs:

  • Module import restrictions: PySpark UDFs on shared clusters and serverless compute cannot access Git folders, workspace files, or Unity Catalog Volumes to import modules on Databricks Runtime 14.2 and below.

  • Broadcast variables: PySpark UDFs on shared clusters and serverless compute do not support broadcast variables.

  • Memory limit: PySpark UDFs on serverless compute have a memory limit of 1GB per PySpark UDF. Exceeding this limit results in the following error: [UDF_PYSPARK_ERROR.OOM] Python worker exited unexpectedly (crashed) due to running out of memory.