Benutzerdefinierte Tabellenfunktionen (User-Defined Table Functions, UDTFs) in Python
Wichtig
Dieses Feature befindet sich in Databricks Runtime 14.3 LTS und höher in der Public Preview.
Mit einer benutzerdefinierten Tabellenfunktion (UDTF) können Sie Funktionen registrieren, die Tabellen anstelle von Skalarwerten zurückgeben. Im Gegensatz zu Skalarwertfunktionen, die einen einzelnen Ergebniswert aus jedem Aufruf zurückgeben, wird jede UDTF in der FROM
-Klausel einer SQL-Anweisung aufgerufen und gibt eine gesamte Tabelle als Ausgabe zurück.
Jeder UDTF-Aufruf kann null oder mehr Argumente akzeptieren. Diese Argumente können Skalarausdrücke oder Tabellenargumente sein, die ganze Eingabetabellen darstellen.
Grundlegende UDTF-Syntax
Apache Spark implementiert Python-UDTFs als Python-Klassen mit einer obligatorischen eval
-Methode, die yield
verwendet, um Ausgabezeilen auszugeben.
Um Ihre Klasse als UDTF zu verwenden, müssen Sie die PySpark-Funktion udtf
importieren. Databricks empfiehlt die Verwendung dieser Funktion als Decorator-Element und die explizite Angabe von Feldnamen und Typen mithilfe der returnType
-Option (es sei denn, die Klasse definiert eine analyze
-Methode, wie in einem späteren Abschnitt beschrieben).
Mit der folgenden UDTF wird eine Tabelle mit einer festen Liste von zwei ganzzahligen Argumenten erstellt:
from pyspark.sql.functions import lit, udtf
@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
def eval(self, x: int, y: int):
yield x + y, x - y
GetSumDiff(lit(1), lit(2)).show()
+----+-----+
| sum| diff|
+----+-----+
| 3| -1|
+----+-----+
Registrieren einer UDTF
UDTFs werden in der lokalen SparkSession
registriert und sind auf Notebook- oder Auftragsebene isoliert.
Sie können UDTFs nicht als Objekte im Unity-Katalog registrieren, und UDTFs können nicht mit SQL-Warehouses verwendet werden.
Mit der Funktion spark.udtf.register()
können Sie eine UDTF für die Verwendung in SQL-Abfragen im aktuellen SparkSession
registrieren. Geben Sie einen Namen für die SQL-Funktion und die Python-UDTF-Klasse an.
spark.udtf.register("get_sum_diff", GetSumDiff)
Aufrufen einer registrierten UDTF
Nach der Registrierung können Sie die UDTF in SQL verwenden, entweder mit dem magischen Befehl %sql
oder der Funktion spark.sql()
:
spark.udtf.register("get_sum_diff", GetSumDiff)
spark.sql("SELECT * FROM get_sum_diff(1,2);")
%sql
SELECT * FROM get_sum_diff(1,2);
Verwenden von Apache Arrow
Wenn Ihre UDTF eine kleine Menge an Daten als Input erhält, aber eine große Tabelle ausgibt, empfiehlt Databricks die Verwendung von Apache Arrow. Sie können dies aktivieren, indem Sie den useArrow
-Parameter beim Deklarieren der UDTF angeben:
@udtf(returnType="c1: int, c2: int", useArrow=True)
Variablenargumentlisten – *args und **kwargs
Sie können die *args
- oder **kwargs
-Syntax von Python verwenden und Logik implementieren, um eine nicht festgelegte Anzahl von Eingabewerten zu verarbeiten.
Im folgenden Beispiel wird dasselbe Ergebnis zurückgegeben, während die Eingabelänge und die Eingabetypen für die Argumente explizit überprüft werden:
@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
def eval(self, *args):
assert(len(args) == 2)
assert(isinstance(arg, int) for arg in args)
x = args[0]
y = args[1]
yield x + y, x - y
GetSumDiff(lit(1), lit(2)).show()
Hier ist das gleiche Beispiel, aber mit Verwendung von Schlüsselwortargumenten:
@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
def eval(self, **kwargs):
x = kwargs["x"]
y = kwargs["y"]
yield x + y, x - y
GetSumDiff(x=lit(1), y=lit(2)).show()
Definieren eines statischen Schemas zur Registrierungszeit
Die UDTF gibt Zeilen mit einem Ausgabeschema zurück, das eine sortierte Sequenz von Spaltennamen und -typen umfasst. Wenn das UDTF-Schema für alle Abfragen immer gleich bleiben soll, können Sie ein statisches, festes Schema nach dem Decorator-Element @udtf
angeben. Es muss entweder ein StructType
sein:
StructType().add("c1", StringType())
Oder eine DDL-Zeichenfolge, die einen Strukturtyp darstellt:
c1: string
Berechnen eines dynamischen Schemas zur Funktionsaufrufzeit
UDTFs können das Ausgabeschema auch programmgesteuert für jeden Aufruf abhängig von den Werten der Eingabeargumente berechnen. Definieren Sie dazu eine statische Methode mit Namen analyze
, die null oder mehr Parameter akzeptiert, die den Argumenten entsprechen, die dem spezifischen UDTF-Aufruf übergeben wurden.
Jedes Argument der analyze
-Methode ist eine Instanz der Klasse AnalyzeArgument
, welche die folgenden Felder enthält:
AnalyzeArgument -Klassenfeld |
Beschreibung |
---|---|
dataType |
Der Typ des Eingabearguments als ein DataType . Bei Eingabetabellenargumenten ist dies ein StructType , welche die Spalten der Tabelle darstellt. |
value |
Der Wert des Eingabearguments als eine Optional[Any] . Dies entspricht None für Tabellenargumente oder literale Skalarargumente, die nicht konstant sind. |
isTable |
Gibt an, ob das Eingabeargument eine Tabelle als ein BooleanType ist. |
isConstantExpression |
Gibt an, ob das Eingabeargument ein konstant faltbarer Ausdruck als ein BooleanType ist. |
Die analyze
-Methode gibt eine Instanz der Klasse AnalyzeResult
zurück, die das Schema der Ergebnistabelle als ein StructType
sowie einige optionale Felder enthält. Wenn die UDTF ein Eingabetabellenargument akzeptiert, dann kann die AnalyzeResult
auch eine angeforderte Möglichkeit zum Partitionieren und Anordnen der Zeilen der Eingabetabelle über mehrere UDTF-Aufrufe hinweg enthalten, wie weiter unten beschrieben.
AnalyzeResult -Klassenfeld |
Beschreibung |
---|---|
schema |
Das Schema der Ergebnistabelle als ein StructType . |
withSinglePartition |
Gibt an, ob alle Eingabezeilen an dieselbe UDTF-Klasseninstanz als ein BooleanType gesendet werden sollen. |
partitionBy |
Wenn dieser Wert auf „nicht leer“ festgelegt ist, werden alle Zeilen mit jeder eindeutigen Kombination aus Werten der Partitionierungsausdrücke von einer separaten Instanz der UDTF-Klasse konsumiert. |
orderBy |
Wenn dieser Wert auf „nicht leer“ festgelegt ist, gibt dies eine Reihenfolge der Zeilen innerhalb jeder Partition an. |
select |
Wenn dieser Satz auf „nicht leer“ festgelegt ist, handelt es sich um eine Sequenz von Ausdrücken, welche die UDTF für Catalyst angibt, um sie für die Spalten im Eingabeargument TABLE auszuwerten. Die UDTF empfängt ein Eingabeattribute für jeden Namen in der Liste in der Reihenfolge, in der sie aufgelistet sind. |
In diesem Beispiel analyze
wird für jedes Wort im Eingabezeichenfolgeargument eine Ausgabespalte zurückgegeben.
@udtf
class MyUDTF:
@staticmethod
def analyze(text: AnalyzeArgument) -> AnalyzeResult:
schema = StructType()
for index, word in enumerate(sorted(list(set(text.value.split(" "))))):
schema = schema.add(f"word_{index}", IntegerType())
return AnalyzeResult(schema=schema)
def eval(self, text: str):
counts = {}
for word in text.split(" "):
if word not in counts:
counts[word] = 0
counts[word] += 1
result = []
for word in sorted(list(set(text.split(" ")))):
result.append(counts[word])
yield result
['word_0', 'word_1']
Weiterleiten des Status an zukünftige eval
-Aufrufe
Die Methode analyze
kann als praktischer Ort für die Initialisierung dienen und die Ergebnisse dann an zukünftige eval
-Methodenaufrufe für denselben UDTF-Aufruf weiterleiten.
Erstellen Sie dazu eine Unterklasse von AnalyzeResult
, und geben Sie eine Instanz der Unterklasse aus der Methode analyze
zurück.
Fügen Sie dann der Methode __init__
ein zusätzliches Argument hinzu, um diese Instanz zu akzeptieren.
In diesem Beispiel analyze
wird ein konstantes Ausgabeschema zurückgegeben, fügt aber benutzerdefinierte Informationen in den Ergebnismetadaten hinzu, die von zukünftigen __init__
-Methodenaufrufen konsumiert werden sollen:
@dataclass
class AnalyzeResultWithBuffer(AnalyzeResult):
buffer: str = ""
@udtf
class TestUDTF:
def __init__(self, analyze_result=None):
self._total = 0
if analyze_result is not None:
self._buffer = analyze_result.buffer
else:
self._buffer = ""
@staticmethod
def analyze(argument, _) -> AnalyzeResult:
if (
argument.value is None
or argument.isTable
or not isinstance(argument.value, str)
or len(argument.value) == 0
):
raise Exception("The first argument must be a non-empty string")
assert argument.dataType == StringType()
assert not argument.isTable
return AnalyzeResultWithBuffer(
schema=StructType()
.add("total", IntegerType())
.add("buffer", StringType()),
withSinglePartition=True,
buffer=argument.value,
)
def eval(self, argument, row: Row):
self._total += 1
def terminate(self):
yield self._total, self._buffer
self.spark.udtf.register("test_udtf", TestUDTF)
spark.sql(
"""
WITH t AS (
SELECT id FROM range(1, 21)
)
SELECT total, buffer
FROM test_udtf("abc", TABLE(t))
"""
).show()
+-------+-------+
| count | buffer|
+-------+-------+
| 20 | "abc"|
+-------+-------+
Ausgabezeilen liefern
Die Methode eval
wird einmal für jede Zeile des Eingabetabellenarguments ausgeführt (oder nur einmal, wenn kein Tabellenargument angegeben wird), gefolgt von einem Aufruf der Methode terminate
am Ende. Beide Methoden geben null oder mehr Zeilen aus, die dem Ergebnisschema entsprechen, indem sie Tupel, Listen oder pyspark.sql.Row
-Objekte liefern.
Dieses Beispiel gibt eine Zeile zurück, indem ein Tupel mit drei Elementen bereitgestellt wird:
def eval(self, x, y, z):
yield (x, y, z)
Sie können die Klammern auch weglassen:
def eval(self, x, y, z):
yield x, y, z
Fügen Sie ein nachfolgendes Komma hinzu, um eine Zeile mit nur einer Spalte zurückzugeben:
def eval(self, x, y, z):
yield x,
Sie können auch ein pyspark.sql.Row
-Objekt liefern.
def eval(self, x, y, z)
from pyspark.sql.types import Row
yield Row(x, y, z)
In diesem Beispiel werden Ausgabezeilen aus der Methode terminate
mithilfe einer Python-Liste geliefert. Zu diesem Zweck können Sie den Zustand innerhalb der Klasse aus früheren Schritten der UDTF-Auswertung speichern.
def terminate(self):
yield [self.x, self.y, self.z]
Übergeben von skalaren Argumenten an eine UDTF
Sie können Skalarargumente als konstante Ausdrücke mit literalen Werten oder darauf basierenden Funktionen an eine UDTF übergeben. Zum Beispiel:
SELECT * FROM udtf(42, group => upper("finance_department"));
Übergeben von Tabellenargumenten an eine UDTF
Python-UDTFs können zusätzlich zu skalaren Eingabeargumenten auch eine Eingabetabelle als Argument akzeptieren. Eine einzelne UDTF kann auch ein Tabellenargument und mehrere Skalarargumente akzeptieren.
Anschließend kann jede SQL-Abfrage eine Eingabetabelle mithilfe des Schlüsselworts TABLE
bereitstellen, gefolgt von Klammern, die einen entsprechenden Tabellenbezeichner umgeben, z. B. TABLE(t)
. Alternativ können Sie eine Tabellenunterabfrage wie TABLE(SELECT a, b, c FROM t)
oder TABLE(SELECT t1.a, t2.b FROM t1 INNER JOIN t2 USING (key))
übergeben.
Das Eingabetabellenargument wird dann als pyspark.sql.Row
-Argument für die Methode eval
mit einem Aufruf zur Methode eval
für jede Zeile in der Eingabetabelle dargestellt. Sie können standardmäßige PySpark-Spaltenfeldanmerkungen verwenden, um mit Spalten in jeder Zeile zu interagieren. Das folgende Beispiel veranschaulicht den expliziten Import des PySpark-Typs Row
und anschließendes Filtern der übergebenen Tabelle auf das Feld id
:
from pyspark.sql.functions import udtf
from pyspark.sql.types import Row
@udtf(returnType="id: int")
class FilterUDTF:
def eval(self, row: Row):
if row["id"] > 5:
yield row["id"],
spark.udtf.register("filter_udtf", FilterUDTF)
Verwenden Sie zum Abfragen der Funktion das SQL-Schlüsselwort TABLE
:
SELECT * FROM filter_udtf(TABLE(SELECT * FROM range(10)));
+---+
| id|
+---+
| 6|
| 7|
| 8|
| 9|
+---+
Angeben einer Partitionierung der Eingabezeilen aus Funktionsaufrufen
Beim Aufrufen einer UDTF mit einem Tabellenargument kann jede SQL-Abfrage die Eingabetabelle über mehrere UDTF-Aufrufe hinweg basierend auf den Werten einer oder mehrerer Eingabetabellenspalten partitionieren.
Verwenden Sie zum Angeben einer Partition die Klausel PARTITION BY
im Funktionsaufruf nach dem TABLE
-Argument.
Dadurch wird sichergestellt, dass alle Eingabezeilen mit jeder eindeutigen Kombination von Werten der Partitionierungsspalten von genau einer Instanz der UDTF-Klasse konsumiert werden.
Beachten Sie, dass die Klausel PARTITION BY
zusätzlich zu einfachen Spaltenverweisen auch beliebige Ausdrücke basierend auf Eingabetabellenspalten akzeptiert. Sie können z. B. die LENGTH
einer Zeichenfolge angeben, einen Monat aus einem Datum extrahieren oder zwei Werte verketten.
Es ist auch möglich, WITH SINGLE PARTITION
anstelle von PARTITION BY
anzugeben, um nur eine Partition anzufordern, bei der alle Eingabezeilen von genau einer Instanz der UDTF-Klasse konsumiert werden müssen.
Innerhalb jeder Partition können Sie optional eine erforderliche Sortierung der Eingabezeilen angeben, wenn sie von der UDTF-Methode eval
konsumiert werden. Geben Sie dazu eine Klausel ORDER BY
nach der oben beschriebenen Klausel PARTITION BY
oder WITH SINGLE PARTITION
an.
Betrachten Sie zum Beispiel die folgende UDTF:
from pyspark.sql.functions import udtf
from pyspark.sql.types import Row
@udtf(returnType="a: string, b: int")
class FilterUDTF:
def __init__(self):
self.key = ""
self.max = 0
def eval(self, row: Row):
self.key = row["a"]
self.max = max(self.max, row["b"])
def terminate(self):
yield self.key, self.max
spark.udtf.register("filter_udtf", FilterUDTF)
Sie können die Partitionierungsoptionen beim Aufruf der UDTF über die Eingabetabelle auf mehrfache Weise angeben:
-- Create an input table with some example values.
DROP TABLE IF EXISTS values_table;
CREATE TABLE values_table (a STRING, b INT);
INSERT INTO values_table VALUES ('abc', 2), ('abc', 4), ('def', 6), ('def', 8)";
SELECT * FROM values_table;
+-------+----+
| a | b |
+-------+----+
| "abc" | 2 |
| "abc" | 4 |
| "def" | 6 |
| "def" | 8 |
+-------+----+
-- Query the UDTF with the input table as an argument and a directive to partition the input
-- rows such that all rows with each unique value in the `a` column are processed by the same
-- instance of the UDTF class. Within each partition, the rows are ordered by the `b` column.
SELECT * FROM filter_udtf(TABLE(values_table) PARTITION BY a ORDER BY b) ORDER BY 1;
+-------+----+
| a | b |
+-------+----+
| "abc" | 4 |
| "def" | 8 |
+-------+----+
-- Query the UDTF with the input table as an argument and a directive to partition the input
-- rows such that all rows with each unique result of evaluating the "LENGTH(a)" expression are
-- processed by the same instance of the UDTF class. Within each partition, the rows are ordered
-- by the `b` column.
SELECT * FROM filter_udtf(TABLE(values_table) PARTITION BY LENGTH(a) ORDER BY b) ORDER BY 1;
+-------+---+
| a | b |
+-------+---+
| "def" | 8 |
+-------+---+
-- Query the UDTF with the input table as an argument and a directive to consider all the input
-- rows in one single partition such that exactly one instance of the UDTF class consumes all of
-- the input rows. Within each partition, the rows are ordered by the `b` column.
SELECT * FROM filter_udtf(TABLE(values_table) WITH SINGLE PARTITION ORDER BY b) ORDER BY 1;
+-------+----+
| a | b |
+-------+----+
| "def" | 8 |
+-------+----+
Angeben einer Partitionierung der Eingabezeilen aus der Methode analyze
Beachten Sie, dass für jede der oben genannten Möglichkeiten zum Partitionieren der Eingabetabelle beim Aufrufen von UDTFs in SQL-Abfragen eine entsprechende Möglichkeit für die UDTF-Methode analyze
vorhanden ist, um stattdessen die gleiche Partitionierungsmethode automatisch anzugeben.
- Anstatt eine UDTF als
SELECT * FROM udtf(TABLE(t) PARTITION BY a)
aufzurufen, können Sie die Methodeanalyze
aktualisieren, um das FeldpartitionBy=[PartitioningColumn("a")]
festzulegen und die Funktion einfach mithilfe vonSELECT * FROM udtf(TABLE(t))
aufzurufen. - Entsprechend können Sie, anstatt
TABLE(t) WITH SINGLE PARTITION ORDER BY b
in der SQL-Abfrage anzugeben,analyze
die FelderwithSinglePartition=true
undorderBy=[OrderingColumn("b")]
festlegen lassen und dann einfachTABLE(t)
übergeben. - Anstatt
TABLE(SELECT a FROM t)
in der SQL-Abfrage zu übergeben, können Sieanalyze
den Wertselect=[SelectedColumn("a")]
festlegen lassen und dann einfachTABLE(t)
übergeben.
Im folgenden Beispiel gibt analyze
ein konstantes Ausgabenschema zurück, wählt eine Teilmenge von Spalten aus der Eingabetabelle aus, und gibt an, dass die Eingabetabelle über mehrere UDTF-Aufrufe hinweg basierend auf den Werten der Spalte date
partitioniert ist:
@staticmethod
def analyze(*args) -> AnalyzeResult:
"""
The input table will be partitioned across several UDTF calls based on the monthly
values of each `date` column. The rows within each partition will arrive ordered by the `date`
column. The UDTF will only receive the `date` and `word` columns from the input table.
"""
from pyspark.sql.functions import (
AnalyzeResult,
OrderingColumn,
PartitioningColumn,
)
assert len(args) == 1, "This function accepts one argument only"
assert args[0].isTable, "Only table arguments are supported"
return AnalyzeResult(
schema=StructType()
.add("month", DateType())
.add('longest_word", IntegerType()),
partitionBy=[
PartitioningColumn("extract(month from date)")],
orderBy=[
OrderingColumn("date")],
select=[
SelectedColumn("date"),
SelectedColumn(
name="length(word),
alias="length_word")])