ユーザー定義スカラー関数 - Python
この記事には、Python ユーザー定義関数 (UDF) の例が含まれています。 ここでは、UDF を登録する方法、UDF を呼び出す方法、Spark SQL における部分式の評価順序に関する注意点を示します。
Databricks Runtime 14.0 以降では、Python ユーザー定義の table 関数 (UDF) を使用して、スカラー valuesではなく、リレーション全体を返す関数を登録できます。 Python ユーザー定義
Note
Databricks Runtime 12.2 LTS 以下では、Python UDF と Pandas UDF は、共有アクセス モードを使用する Unity Catalog コンピューティングではサポートされていません。 スカラー Python UDF と Pandas UDF は、Databricks Runtime 13.3 LTS 以降で、すべてのアクセス モードでサポートされています。
Databricks Runtime 13.3 LTS 以降では、SQL 構文を使用してスカラー Python UDF を Unity Catalog に登録できます。 Unity
関数を UDF として登録する
def squared(s):
return s * s
spark.udf.register("squaredWithPython", squared)
必要に応じて、UDF の戻り値の型に対して set を適用できます。 既定の戻り値の型は StringType
です。
from pyspark.sql.types import LongType
def squared_typed(s):
return s * s
spark.udf.register("squaredWithPython", squared_typed, LongType())
Spark SQL で UDF を呼び出す
spark.range(1, 20).createOrReplaceTempView("test")
%sql select id, squaredWithPython(id) as id_squared from test
DataFrame で UDF を使用する
from pyspark.sql.functions import udf
from pyspark.sql.types import LongType
squared_udf = udf(squared, LongType())
df = spark.table("test")
display(df.select("id", squared_udf("id").alias("id_squared")))
また、次のように注釈構文を使用して同じ UDF を宣言することもできます。
from pyspark.sql.functions import udf
@udf("long")
def squared_udf(s):
return s * s
df = spark.table("test")
display(df.select("id", squared_udf("id").alias("id_squared")))
評価順序と null チェック
Spark SQL (SQL、DataFrame、データセット API を含む) では、部分式の評価の順序は保証されません。 特に、演算子や関数の入力は、必ずしも左から右へ、またはその他の決まった順序で評価されるとは限りません。 たとえば、AND
および OR
論理式には、左から右への "短絡" セマンティクスはありません。
したがって、クエリの最適化および計画の際に式や句の順序は並べ替えられる可能性があるため、ブール式の副作用や評価の順序および WHERE
と HAVING
句の順序に依存することは危険です。 具体的には、UDF が null チェックのために SQL の短絡セマンティクスに依存している場合、UDF を呼び出す前に、null チェックが行われる保証はありません。 たとえば、次のように入力します。
spark.udf.register("strlen", lambda s: len(s), "int")
spark.sql("select s from test1 where s is not null and strlen(s) > 1") # no guarantee
この WHERE
句を使用しても、null を除外した後に strlen
UDF が呼び出されることは保証されません。
適切な null チェックを実行するには、次のいずれかを実行することをお勧めします。
- UDF 自体を null で認識し、UDF 自体の内部で null チェックを実行する
IF
またはCASE WHEN
式を使用して null チェックを実行し、条件分岐で UDF を呼び出す
spark.udf.register("strlen_nullsafe", lambda s: len(s) if not s is None else -1, "int")
spark.sql("select s from test1 where s is not null and strlen_nullsafe(s) > 1") // ok
spark.sql("select s from test1 where if(s is not null, strlen(s), null) > 1") // ok
制限事項
PySpark UDF には、次の制限事項が適用されます。
モジュールのインポート制限: 共有クラスターとサーバーレス コンピューティング上の PySpark UDF、Git フォルダー、ワークスペース ファイル、または Unity CatalogVolumes にアクセスして Databricks Runtime 14.2 以降にモジュールをインポートすることはできません。
ブロードキャスト変数: 共有クラスターとサーバーレス コンピューティングでの PySpark UDF の は、ブロードキャスト変数をサポートしていません。
メモリ limit: サーバーレス コンピューティング上の PySpark UDF には、PySpark UDF あたり 1 GB のメモリ limit があります。 この limit を超えると、次のエラーが発生します:
[UDF_PYSPARK_ERROR.OOM] Python worker exited unexpectedly (crashed) due to running out of memory.