Benutzerdefinierte Skalarfunktionen: Python
In diesem Artikel finden Sie Beispiele für benutzerdefinierte Python-Funktionen (User-Defined Functions, UDFs). Darin wird gezeigt, wie Sie UDFs registrieren, wie Sie UDFs aufrufen und welche Einschränkungen in Bezug auf die Auswertungsreihenfolge von Teilausdrücken in Spark SQL bestehen.
In Databricks Runtime 14.0 und höher können Sie benutzerdefinierte Tabellenfunktionen (User-Defined Table Functions, UDTFs) von Python verwenden, um Funktionen zu registrieren, die ganze Beziehungen anstelle von Skalarwerten zurückgeben. Siehe Benutzerdefinierte Tabellenfunktionen (User-Defined Table Functions, UDTFs) in Python.
Hinweis
In Databricks Runtime 12.2 LTS und unten werden Python UDFs und Pandas UDFs nicht auf Unity Catalog Compute unterstützt, die den Modus für gemeinsame Zugriffe verwendet. Skalare Python UDFs und Pandas UDFs werden in Databricks Runtime 13.3 LTS und höher für alle Zugriffsmodi unterstützt.
In Databricks Runtime 13.3 LTS und höher können Sie skalare Python-UDFs mit SQL-Syntax bei Unity Catalog registrieren. Weitere Informationen finden Sie unter Benutzerdefinierte Funktionen (UDFs, user-defined functions) in Unity Catalog.
Registrieren einer Funktion als UDF
def squared(s):
return s * s
spark.udf.register("squaredWithPython", squared)
Optional können Sie den Rückgabetyp Ihrer UDF festlegen. Der Standardrückgabetyp ist StringType
.
from pyspark.sql.types import LongType
def squared_typed(s):
return s * s
spark.udf.register("squaredWithPython", squared_typed, LongType())
Aufrufen der UDF in Spark SQL
spark.range(1, 20).createOrReplaceTempView("test")
%sql select id, squaredWithPython(id) as id_squared from test
Verwenden von UDFs mit Datenrahmen
from pyspark.sql.functions import udf
from pyspark.sql.types import LongType
squared_udf = udf(squared, LongType())
df = spark.table("test")
display(df.select("id", squared_udf("id").alias("id_squared")))
Alternativ können Sie dieselbe UDF mithilfe der Anmerkungssyntax deklarieren:
from pyspark.sql.functions import udf
@udf("long")
def squared_udf(s):
return s * s
df = spark.table("test")
display(df.select("id", squared_udf("id").alias("id_squared")))
Auswertungsreihenfolge und NULL-Überprüfung
Spark SQL (einschließlich SQL und der Datenrahmen- und Dataset-APIs) garantiert nicht die Reihenfolge der Auswertung von Teilausdrücken. Insbesondere werden die Eingaben eines Operators oder einer Funktion nicht zwangsläufig von links nach rechts oder in einer anderen festen Reihenfolge ausgewertet. Beispielsweise gilt für logische AND
- und OR
-Ausdrücke keine „Kurzschluss“-Semantik von links nach rechts.
Daher ist es gefährlich, sich auf die Nebeneffekte oder die Reihenfolge der Auswertung von booleschen Ausdrücken und die Reihenfolge von WHERE
- und HAVING
-Klauseln zu verlassen, da solche Ausdrücke und Klauseln während der Abfrageoptimierung und -planung neu angeordnet werden können. Insbesondere wenn eine UDF für die NULL-Überprüfung auf die Kurzschluss-Semantik in SQL angewiesen ist, gibt es keine Garantie, dass die NULL-Überprüfung vor dem Aufrufen der UDF erfolgt. Ein auf ein Objekt angewendeter
spark.udf.register("strlen", lambda s: len(s), "int")
spark.sql("select s from test1 where s is not null and strlen(s) > 1") # no guarantee
Diese WHERE
-Klausel garantiert nicht, dass die UDF strlen
nach dem Herausfiltern von NULL-Werten aufgerufen wird.
Es wird empfohlen, eine der folgenden Aktionen auszuführen, um eine ordnungsgemäße NULL-Überprüfung durchzuführen:
- Machen Sie die UDF selbst nullfähig, und führen Sie die NULL-Überprüfung innerhalb der UDF selbst durch.
- Verwenden Sie
IF
- oderCASE WHEN
-Ausdrücke zum Durchführen der NULL-Überprüfung, und rufen Sie die UDF in einer bedingten Verzweigung auf.
spark.udf.register("strlen_nullsafe", lambda s: len(s) if not s is None else -1, "int")
spark.sql("select s from test1 where s is not null and strlen_nullsafe(s) > 1") // ok
spark.sql("select s from test1 where if(s is not null, strlen(s), null) > 1") // ok
Begrenzungen
- PySpark UDFs auf freigegebenen Clustern oder serverlosen Berechnungen können nicht auf Git-Ordner, Arbeitsbereichsdateien oder UC Volumes zugreifen, um Module auf Databricks Runtime 14.2 und darunter zu importieren.