Was sind benutzerdefinierte Funktionen (UDFs, User Defined Functions)?
Mit benutzerdefinierten Funktionen (USER-Defined Functions, UDFs) können Sie Code wiederverwenden und freigeben, der integrierte Funktionen in Azure Databricks erweitert. Verwenden Sie UDFs, um bestimmte Aufgaben auszuführen, z. B. komplexe Berechnungen, Transformationen oder benutzerdefinierte Datenmanipulationen.
Hinweis
Auf Clustern mit freigegebenem Zugriffsmodus werden Python skalare UDFs in Databricks Runtime 13.3 LTS und höher unterstützt, während Scala UDFs in Databricks Runtime 14.2 und höher unterstützt werden.
Python skalare UDFs können im Unity-Katalog mithilfe der SQL-Syntax in Databricks Runtime 13.3 LTS und höher registriert werden. Weitere Informationen finden Sie unter User-defined functions (UDFs) in Unity Catalog (Benutzerdefinierte Funktionen (UDFs) in Unity Catalog).
Wann sollten Sie eine UDF verwenden?
Verwenden Sie UDFs für Logik, die mit integrierten Apache Spark-Funktionen schwer ausgedrückt werden kann. Integrierte Apache Spark-Funktionen sind für die verteilte Verarbeitung optimiert und bieten in der Regel eine bessere Leistung im Großen und Ganzen. Weitere Informationen finden Sie unter Funktionen.
Databricks empfiehlt UDFs für Ad-hoc-Abfragen, manuelle Datenbereinigung, explorative Datenanalyse und Vorgänge für kleine bis mittlere Datasets. Häufige Anwendungsfälle für UDFs sind Datenverschlüsselung und -entschlüsselung, Hashing, JSON-Analyse und Validierung.
Verwenden Sie Apache Spark-Methoden für Vorgänge in sehr großen Datasets und alle Workloads, die regelmäßig oder kontinuierlich ausgeführt werden, einschließlich ETL-Aufträgen und Streamingvorgängen.
Registrierte und sitzungsbezogene UDFs
UDFs, die mit SQL erstellt wurden, werden im Unity-Katalog registriert und verfügen über zugehörige Berechtigungen, während UDFs, die in Ihrem Notizbuch erstellt wurden, sitzungsbasiert sind und auf die aktuelle SparkSession festgelegt sind.
Sie können sitzungsbasierte UDFs mithilfe einer beliebigen sprache definieren und darauf zugreifen, die von Azure Databricks unterstützt wird. UDFs können skalar oder nicht skalar sein.
Hinweis
Derzeit sind nur SQL- und Python-Skalar-UDFs, die im Unity-Katalog registriert sind, in DBSQL verfügbar.
Benutzerdefinierte Skalarfunktionen
Skalare UDFs werden für eine einzelne Zeile ausgeführt und geben einen einzelnen Wert für jede Zeile zurück. Im folgenden Beispiel wird eine skalare UDF verwendet, um die Länge der einzelnen Namen in einer name
Spalte zu berechnen und den Wert in einer neuen Spalte name_length
hinzuzufügen:
+-------+-------+
| name | score |
+-------+-------+
| alice | 10.0 |
| bob | 20.0 |
| carol | 30.0 |
| dave | 40.0 |
| eve | 50.0 |
+-------+-------+
-- Create a SQL UDF for name length
CREATE OR REPLACE FUNCTION get_name_length(name STRING)
RETURNS INT
RETURN LENGTH(name);
-- Use the UDF in a SQL query
SELECT name, get_name_length(name) AS name_length
FROM your_table;
+-------+-------+-------------+
| name | score | name_length |
+-------+-------+-------------+
| alice | 10.0 | 5 |
| bob | 20.0 | 3 |
| carol | 30.0 | 5 |
| dave | 40.0 | 4 |
| eve | 50.0 | 3 |
+-------+-------+-------------+
So implementieren Sie dies in einem Databricks-Notizbuch mithilfe von PySpark:
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
@udf(returnType=IntegerType())
def get_name_length(name):
return len(name)
df = df.withColumn("name_length", get_name_length(df.name))
# Show the result
display(df)
Weitere Informationen finden Sie unter user-defined functions (UDFs) in Unity Catalog und user-defined scalar functions - Python.
Benutzerdefinierte Aggregatfunktionen (User-Defined Aggregate Functions, UDAFs)
Benutzerdefinierte Aggregatfunktionen (UDAFs) werden für mehrere Zeilen ausgeführt und geben ein einzelnes aggregiertes Ergebnis zurück. Im folgenden Beispiel wird eine UDAF definiert, die Bewertungen aggregiert.
from pyspark.sql.functions import pandas_udf
from pyspark.sql import SparkSession
import pandas as pd
# Define a pandas UDF for aggregating scores
@pandas_udf("int")
def total_score_udf(scores: pd.Series) -> int:
return scores.sum()
# Group by name length and aggregate
result_df = (df.groupBy("name_length")
.agg(total_score_udf(df["score"]).alias("total_score")))
display(result_df)
+-------------+-------------+
| name_length | total_score |
+-------------+-------------+
| 3 | 70.0 |
| 4 | 40.0 |
| 5 | 40.0 |
+-------------+-------------+
Siehe pandas user-defined functions for Python and User-defined aggregate functions - Scala.
Benutzerdefinierte Tabellenfunktionen (User-Defined Table Functions, UDTFs) in Python
Wichtig
Dieses Feature befindet sich in der Public Preview.
Hinweis
Python UDTFs sind in Databricks Runtime 14.3 LTS und höher verfügbar.
Benutzerdefinierte Python-Tabellenfunktionen (USER-Defined Table Functions, UDTFs) können für jede Eingabezeile mehrere Zeilen und Spalten zurückgeben. Im folgenden Beispiel entspricht jeder Wert in der Scorespalte einer Liste von Kategorien. Ein UDTF ist definiert, um die durch Kommas getrennte Liste in mehrere Zeilen aufzuteilen. Siehe benutzerdefinierte Python-Tabellenfunktionen (UDTFs)
+-------+-------+-----------------+
| name | score | categories |
+-------+-------+-----------------+
| alice | 10.0 | math,science |
| bob | 20.0 | history,math |
| carol | 30.0 | science,history |
| dave | 40.0 | math,art |
| eve | 50.0 | science,art |
+-------+-------+-----------------+
from pyspark.sql.functions import udtf
@udtf(returnType="score: int, categories: string, name: string")
class ScoreCategoriesUDTF:
def eval(self, name: str, score: float, categories: str):
category_list = categories.split(',')
for category in category_list:
yield (name, score, category)
# Apply the UDTF
result_df = df.select(ScoreCategoriesUDTF(df.score, df.categories, df.name))
display(result_df)
+-------+-------+----------+
| name | score | category |
+-------+-------+----------+
| alice | 10.0 | math |
| alice | 10.0 | science |
| bob | 20.0 | history |
| bob | 20.0 | math |
| carol | 30.0 | science |
| carol | 30.0 | history |
| dave | 40.0 | math |
| dave | 40.0 | art |
| eve | 50.0 | science |
| eve | 50.0 | art |
+-------+-------+----------+
Leistungsüberlegungen
- Integrierte Funktionen und SQL UDFs sind die effizienteste Option.
- Skala UDFs sind im Allgemeinen schneller, wenn sie innerhalb des Java Virtual Machine (JVM) ausgeführt werden, und vermeiden Sie den Aufwand für das Verschieben von Daten in und aus dem JVM.
- Python UDFs und Pandas UDFs neigen dazu, langsamer zu sein als Scala UDFs, da sie daten serialisiert und aus dem JVM in den Python-Dolmetscher verschoben werden müssen. Pandas UDFs bis zu 100x schneller als Python UDFs, da sie Apache Arrow verwenden, um serialisierungskosten zu reduzieren.