Delen via


Door de gebruiker gedefinieerde scalaire functies - Python

Dit artikel bevat voorbeelden van door de gebruiker gedefinieerde Python-functie (UDF). Het laat zien hoe u UDF's registreert, UDF's aanroept en u een waarschuwing geeft over de evaluatievolgorde van subexpressies in Spark SQL.

In Databricks Runtime 14.0 en hoger kunt u door de gebruiker gedefinieerde Python-table-functies (UDDF's) gebruiken om functies te registreren die volledige relaties retourneren in plaats van scalaire values. Zie door de gebruiker gedefinieerde Python-table-functies (UDDF's).

Notitie

In Databricks Runtime 12.2 LTS en lager worden Python UDF's en Pandas UDF's niet ondersteund op Unity Catalog compute die gebruikmaakt van de modus voor gedeelde toegang. Scalaire Python UDF's en Pandas UDF's worden ondersteund in Databricks Runtime 13.3 LTS en hoger voor alle toegangsmodi.

In Databricks Runtime 13.3 LTS en hoger kunt u scalaire Python UDF's registreren bij Unity Catalog met behulp van SQL-syntaxis. Zie door de gebruiker gedefinieerde functies (UDF's) in Unity Catalog.

Een functie registreren als een UDF

def squared(s):
  return s * s
spark.udf.register("squaredWithPython", squared)

U kunt desgewenst het retourtype van uw UDF set. Het standaard retourtype is StringType.

from pyspark.sql.types import LongType
def squared_typed(s):
  return s * s
spark.udf.register("squaredWithPython", squared_typed, LongType())

De UDF aanroepen in Spark SQL

spark.range(1, 20).createOrReplaceTempView("test")
%sql select id, squaredWithPython(id) as id_squared from test

UDF gebruiken met DataFrames

from pyspark.sql.functions import udf
from pyspark.sql.types import LongType
squared_udf = udf(squared, LongType())
df = spark.table("test")
display(df.select("id", squared_udf("id").alias("id_squared")))

U kunt ook dezelfde UDF declareren met behulp van de aantekeningsyntaxis:

from pyspark.sql.functions import udf
@udf("long")
def squared_udf(s):
  return s * s
df = spark.table("test")
display(df.select("id", squared_udf("id").alias("id_squared")))

Evaluatievolgorde en null-controle

Spark SQL (inclusief SQL en de DataFrame- en Gegevensset-API) garandeert niet de volgorde van evaluatie van subexpressies. Met name worden de invoer van een operator of functie niet noodzakelijkerwijs van links naar rechts of in een andere vaste volgorde geƫvalueerd. Logische en AND expressies hebben bijvoorbeeld OR geen semantiek van links-naar-rechts 'kortsluiting'.

Daarom is het gevaarlijk om te vertrouwen op de bijwerkingen of de volgorde van evaluatie van Boole-expressies, en de volgorde van WHERE en HAVING componenten, omdat dergelijke expressies en componenten tijdens queryoptimalisatie en -planning opnieuw kunnen worden gerangschikt. Als een UDF afhankelijk is van kortsluitingssemantiek in SQL voor null-controle, is er geen garantie dat de null-controle plaatsvindt voordat de UDF wordt aanroepen. Bijvoorbeeld:

spark.udf.register("strlen", lambda s: len(s), "int")
spark.sql("select s from test1 where s is not null and strlen(s) > 1") # no guarantee

Deze WHERE component garandeert niet dat de strlen UDF wordt aangeroepen nadat null-waarden zijn uitgefilterd.

Als u de juiste null-controle wilt uitvoeren, raden we u aan een van de volgende handelingen uit te voeren:

  • De UDF zelf null-aware maken en null-controle uitvoeren in de UDF zelf
  • Gebruik IF of CASE WHEN expressies om de null-controle uit te voeren en de UDF aan te roepen in een voorwaardelijke vertakking
spark.udf.register("strlen_nullsafe", lambda s: len(s) if not s is None else -1, "int")
spark.sql("select s from test1 where s is not null and strlen_nullsafe(s) > 1") // ok
spark.sql("select s from test1 where if(s is not null, strlen(s), null) > 1")   // ok

Beperkingen

De volgende beperkingen gelden voor PySpark UDF's:

  • importbeperkingen voor modules: PySpark UDF's op gedeelde clusters en serverloze compute hebben geen toegang tot Git-mappen, werkruimtebestanden of Unity-CatalogVolumes om modules te importeren in Databricks Runtime 14.2 en lager.

  • Broadcast-variabelen: PySpark UDF's op gedeelde clusters en serverloze berekeningen bieden geen ondersteuning voor broadcastvariabelen.

  • Geheugen limit: PySpark UDF's op serverloze berekeningen hebben een geheugen limit van 1 GB per PySpark UDF. Het overschrijden van deze limit resulteert in de volgende fout: [UDF_PYSPARK_ERROR.OOM] Python worker exited unexpectedly (crashed) due to running out of memory.