Freigeben über


Benutzerdefinierte Tabellenfunktionen (User-Defined Table Functions, UDTFs) in Python

Wichtig

Dieses Feature befindet sich in Databricks Runtime 14.3 LTS und höher in der Public Preview.

Mit einer benutzerdefinierten Tabellenfunktion (UDTF) können Sie Funktionen registrieren, die Tabellen anstelle von Skalarwerten zurückgeben. Im Gegensatz zu Skalarwertfunktionen, die einen einzelnen Ergebniswert aus jedem Aufruf zurückgeben, wird jede UDTF in der FROM-Klausel einer SQL-Anweisung aufgerufen und gibt eine gesamte Tabelle als Ausgabe zurück.

Jeder UDTF-Aufruf kann null oder mehr Argumente akzeptieren. Diese Argumente können Skalarausdrücke oder Tabellenargumente sein, die ganze Eingabetabellen darstellen.

Grundlegende UDTF-Syntax

Apache Spark implementiert Python-UDTFs als Python-Klassen mit einer obligatorischen eval-Methode, die yield verwendet, um Ausgabezeilen auszugeben.

Um Ihre Klasse als UDTF zu verwenden, müssen Sie die PySpark-Funktion udtf importieren. Databricks empfiehlt die Verwendung dieser Funktion als Decorator-Element und die explizite Angabe von Feldnamen und Typen mithilfe der returnType-Option (es sei denn, die Klasse definiert eine analyze-Methode, wie in einem späteren Abschnitt beschrieben).

Mit der folgenden UDTF wird eine Tabelle mit einer festen Liste von zwei ganzzahligen Argumenten erstellt:

from pyspark.sql.functions import lit, udtf

@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
    def eval(self, x: int, y: int):
        yield x + y, x - y

GetSumDiff(lit(1), lit(2)).show()
+----+-----+
| sum| diff|
+----+-----+
|   3|   -1|
+----+-----+

Registrieren einer UDTF

UDTFs werden in der lokalen SparkSession registriert und sind auf Notebook- oder Auftragsebene isoliert.

Sie können UDTFs nicht als Objekte im Unity-Katalog registrieren, und UDTFs können nicht mit SQL-Warehouses verwendet werden.

Mit der Funktion spark.udtf.register() können Sie eine UDTF für die Verwendung in SQL-Abfragen im aktuellen SparkSession registrieren. Geben Sie einen Namen für die SQL-Funktion und die Python-UDTF-Klasse an.

spark.udtf.register("get_sum_diff", GetSumDiff)

Aufrufen einer registrierten UDTF

Nach der Registrierung können Sie die UDTF in SQL verwenden, entweder mit dem magischen Befehl %sql oder der Funktion spark.sql():

spark.udtf.register("get_sum_diff", GetSumDiff)
spark.sql("SELECT * FROM get_sum_diff(1,2);")
%sql
SELECT * FROM get_sum_diff(1,2);

Verwenden von Apache Arrow

Wenn Ihre UDTF eine kleine Menge an Daten als Input erhält, aber eine große Tabelle ausgibt, empfiehlt Databricks die Verwendung von Apache Arrow. Sie können dies aktivieren, indem Sie den useArrow-Parameter beim Deklarieren der UDTF angeben:

@udtf(returnType="c1: int, c2: int", useArrow=True)

Variablenargumentlisten – *args und **kwargs

Sie können die *args- oder **kwargs-Syntax von Python verwenden und Logik implementieren, um eine nicht festgelegte Anzahl von Eingabewerten zu verarbeiten.

Im folgenden Beispiel wird dasselbe Ergebnis zurückgegeben, während die Eingabelänge und die Eingabetypen für die Argumente explizit überprüft werden:

@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
    def eval(self, *args):
        assert(len(args) == 2)
        assert(isinstance(arg, int) for arg in args)
        x = args[0]
        y = args[1]
        yield x + y, x - y

GetSumDiff(lit(1), lit(2)).show()

Hier ist das gleiche Beispiel, aber mit Verwendung von Schlüsselwortargumenten:

@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
    def eval(self, **kwargs):
        x = kwargs["x"]
        y = kwargs["y"]
        yield x + y, x - y

GetSumDiff(x=lit(1), y=lit(2)).show()

Definieren eines statischen Schemas zur Registrierungszeit

Die UDTF gibt Zeilen mit einem Ausgabeschema zurück, das eine sortierte Sequenz von Spaltennamen und -typen umfasst. Wenn das UDTF-Schema für alle Abfragen immer gleich bleiben soll, können Sie ein statisches, festes Schema nach dem Decorator-Element @udtf angeben. Es muss entweder ein StructType sein:

StructType().add("c1", StringType())

Oder eine DDL-Zeichenfolge, die einen Strukturtyp darstellt:

c1: string

Berechnen eines dynamischen Schemas zur Funktionsaufrufzeit

UDTFs können das Ausgabeschema auch programmgesteuert für jeden Aufruf abhängig von den Werten der Eingabeargumente berechnen. Definieren Sie dazu eine statische Methode mit Namen analyze, die null oder mehr Parameter akzeptiert, die den Argumenten entsprechen, die dem spezifischen UDTF-Aufruf übergeben wurden.

Jedes Argument der analyze-Methode ist eine Instanz der Klasse AnalyzeArgument, welche die folgenden Felder enthält:

AnalyzeArgument-Klassenfeld Beschreibung
dataType Der Typ des Eingabearguments als ein DataType. Bei Eingabetabellenargumenten ist dies ein StructType, welche die Spalten der Tabelle darstellt.
value Der Wert des Eingabearguments als eine Optional[Any]. Dies entspricht None für Tabellenargumente oder literale Skalarargumente, die nicht konstant sind.
isTable Gibt an, ob das Eingabeargument eine Tabelle als ein BooleanType ist.
isConstantExpression Gibt an, ob das Eingabeargument ein konstant faltbarer Ausdruck als ein BooleanType ist.

Die analyze-Methode gibt eine Instanz der Klasse AnalyzeResult zurück, die das Schema der Ergebnistabelle als ein StructType sowie einige optionale Felder enthält. Wenn die UDTF ein Eingabetabellenargument akzeptiert, dann kann die AnalyzeResult auch eine angeforderte Möglichkeit zum Partitionieren und Anordnen der Zeilen der Eingabetabelle über mehrere UDTF-Aufrufe hinweg enthalten, wie weiter unten beschrieben.

AnalyzeResult-Klassenfeld Beschreibung
schema Das Schema der Ergebnistabelle als ein StructType.
withSinglePartition Gibt an, ob alle Eingabezeilen an dieselbe UDTF-Klasseninstanz als ein BooleanType gesendet werden sollen.
partitionBy Wenn dieser Wert auf „nicht leer“ festgelegt ist, werden alle Zeilen mit jeder eindeutigen Kombination aus Werten der Partitionierungsausdrücke von einer separaten Instanz der UDTF-Klasse konsumiert.
orderBy Wenn dieser Wert auf „nicht leer“ festgelegt ist, gibt dies eine Reihenfolge der Zeilen innerhalb jeder Partition an.
select Wenn dieser Satz auf „nicht leer“ festgelegt ist, handelt es sich um eine Sequenz von Ausdrücken, welche die UDTF für Catalyst angibt, um sie für die Spalten im Eingabeargument TABLE auszuwerten. Die UDTF empfängt ein Eingabeattribute für jeden Namen in der Liste in der Reihenfolge, in der sie aufgelistet sind.

In diesem Beispiel analyze wird für jedes Wort im Eingabezeichenfolgeargument eine Ausgabespalte zurückgegeben.

@udtf
class MyUDTF:
  @staticmethod
  def analyze(text: AnalyzeArgument) -> AnalyzeResult:
    schema = StructType()
    for index, word in enumerate(sorted(list(set(text.value.split(" "))))):
      schema = schema.add(f"word_{index}", IntegerType())
    return AnalyzeResult(schema=schema)

  def eval(self, text: str):
    counts = {}
    for word in text.split(" "):
      if word not in counts:
            counts[word] = 0
      counts[word] += 1
    result = []
    for word in sorted(list(set(text.split(" ")))):
      result.append(counts[word])
    yield result
['word_0', 'word_1']

Weiterleiten des Status an zukünftige eval-Aufrufe

Die Methode analyze kann als praktischer Ort für die Initialisierung dienen und die Ergebnisse dann an zukünftige eval-Methodenaufrufe für denselben UDTF-Aufruf weiterleiten.

Erstellen Sie dazu eine Unterklasse von AnalyzeResult, und geben Sie eine Instanz der Unterklasse aus der Methode analyze zurück. Fügen Sie dann der Methode __init__ ein zusätzliches Argument hinzu, um diese Instanz zu akzeptieren.

In diesem Beispiel analyze wird ein konstantes Ausgabeschema zurückgegeben, fügt aber benutzerdefinierte Informationen in den Ergebnismetadaten hinzu, die von zukünftigen __init__-Methodenaufrufen konsumiert werden sollen:

@dataclass
class AnalyzeResultWithBuffer(AnalyzeResult):
    buffer: str = ""

@udtf
class TestUDTF:
  def __init__(self, analyze_result=None):
    self._total = 0
    if analyze_result is not None:
      self._buffer = analyze_result.buffer
    else:
      self._buffer = ""

  @staticmethod
  def analyze(argument, _) -> AnalyzeResult:
    if (
      argument.value is None
      or argument.isTable
      or not isinstance(argument.value, str)
      or len(argument.value) == 0
    ):
      raise Exception("The first argument must be a non-empty string")
    assert argument.dataType == StringType()
    assert not argument.isTable
    return AnalyzeResultWithBuffer(
      schema=StructType()
        .add("total", IntegerType())
        .add("buffer", StringType()),
      withSinglePartition=True,
      buffer=argument.value,
    )

  def eval(self, argument, row: Row):
    self._total += 1

  def terminate(self):
    yield self._total, self._buffer

self.spark.udtf.register("test_udtf", TestUDTF)

spark.sql(
  """
  WITH t AS (
    SELECT id FROM range(1, 21)
  )
  SELECT total, buffer
  FROM test_udtf("abc", TABLE(t))
  """
).show()
+-------+-------+
| count | buffer|
+-------+-------+
|    20 |  "abc"|
+-------+-------+

Ausgabezeilen liefern

Die Methode eval wird einmal für jede Zeile des Eingabetabellenarguments ausgeführt (oder nur einmal, wenn kein Tabellenargument angegeben wird), gefolgt von einem Aufruf der Methode terminate am Ende. Beide Methoden geben null oder mehr Zeilen aus, die dem Ergebnisschema entsprechen, indem sie Tupel, Listen oder pyspark.sql.Row-Objekte liefern.

Dieses Beispiel gibt eine Zeile zurück, indem ein Tupel mit drei Elementen bereitgestellt wird:

def eval(self, x, y, z):
  yield (x, y, z)

Sie können die Klammern auch weglassen:

def eval(self, x, y, z):
  yield x, y, z

Fügen Sie ein nachfolgendes Komma hinzu, um eine Zeile mit nur einer Spalte zurückzugeben:

def eval(self, x, y, z):
  yield x,

Sie können auch ein pyspark.sql.Row-Objekt liefern.

def eval(self, x, y, z)
  from pyspark.sql.types import Row
  yield Row(x, y, z)

In diesem Beispiel werden Ausgabezeilen aus der Methode terminate mithilfe einer Python-Liste geliefert. Zu diesem Zweck können Sie den Zustand innerhalb der Klasse aus früheren Schritten der UDTF-Auswertung speichern.

def terminate(self):
  yield [self.x, self.y, self.z]

Übergeben von skalaren Argumenten an eine UDTF

Sie können Skalarargumente als konstante Ausdrücke mit literalen Werten oder darauf basierenden Funktionen an eine UDTF übergeben. Zum Beispiel:

SELECT * FROM udtf(42, group => upper("finance_department"));

Übergeben von Tabellenargumenten an eine UDTF

Python-UDTFs können zusätzlich zu skalaren Eingabeargumenten auch eine Eingabetabelle als Argument akzeptieren. Eine einzelne UDTF kann auch ein Tabellenargument und mehrere Skalarargumente akzeptieren.

Anschließend kann jede SQL-Abfrage eine Eingabetabelle mithilfe des Schlüsselworts TABLE bereitstellen, gefolgt von Klammern, die einen entsprechenden Tabellenbezeichner umgeben, z. B. TABLE(t). Alternativ können Sie eine Tabellenunterabfrage wie TABLE(SELECT a, b, c FROM t) oder TABLE(SELECT t1.a, t2.b FROM t1 INNER JOIN t2 USING (key)) übergeben.

Das Eingabetabellenargument wird dann als pyspark.sql.Row-Argument für die Methode eval mit einem Aufruf zur Methode eval für jede Zeile in der Eingabetabelle dargestellt. Sie können standardmäßige PySpark-Spaltenfeldanmerkungen verwenden, um mit Spalten in jeder Zeile zu interagieren. Das folgende Beispiel veranschaulicht den expliziten Import des PySpark-Typs Row und anschließendes Filtern der übergebenen Tabelle auf das Feld id:

from pyspark.sql.functions import udtf
from pyspark.sql.types import Row

@udtf(returnType="id: int")
class FilterUDTF:
    def eval(self, row: Row):
        if row["id"] > 5:
            yield row["id"],

spark.udtf.register("filter_udtf", FilterUDTF)

Verwenden Sie zum Abfragen der Funktion das SQL-Schlüsselwort TABLE:

SELECT * FROM filter_udtf(TABLE(SELECT * FROM range(10)));
+---+
| id|
+---+
|  6|
|  7|
|  8|
|  9|
+---+

Angeben einer Partitionierung der Eingabezeilen aus Funktionsaufrufen

Beim Aufrufen einer UDTF mit einem Tabellenargument kann jede SQL-Abfrage die Eingabetabelle über mehrere UDTF-Aufrufe hinweg basierend auf den Werten einer oder mehrerer Eingabetabellenspalten partitionieren.

Verwenden Sie zum Angeben einer Partition die Klausel PARTITION BY im Funktionsaufruf nach dem TABLE-Argument. Dadurch wird sichergestellt, dass alle Eingabezeilen mit jeder eindeutigen Kombination von Werten der Partitionierungsspalten von genau einer Instanz der UDTF-Klasse konsumiert werden.

Beachten Sie, dass die Klausel PARTITION BY zusätzlich zu einfachen Spaltenverweisen auch beliebige Ausdrücke basierend auf Eingabetabellenspalten akzeptiert. Sie können z. B. die LENGTH einer Zeichenfolge angeben, einen Monat aus einem Datum extrahieren oder zwei Werte verketten.

Es ist auch möglich, WITH SINGLE PARTITION anstelle von PARTITION BY anzugeben, um nur eine Partition anzufordern, bei der alle Eingabezeilen von genau einer Instanz der UDTF-Klasse konsumiert werden müssen.

Innerhalb jeder Partition können Sie optional eine erforderliche Sortierung der Eingabezeilen angeben, wenn sie von der UDTF-Methode eval konsumiert werden. Geben Sie dazu eine Klausel ORDER BY nach der oben beschriebenen Klausel PARTITION BY oder WITH SINGLE PARTITION an.

Betrachten Sie zum Beispiel die folgende UDTF:

from pyspark.sql.functions import udtf
from pyspark.sql.types import Row

@udtf(returnType="a: string, b: int")
class FilterUDTF:
  def __init__(self):
    self.key = ""
    self.max = 0

  def eval(self, row: Row):
    self.key = row["a"]
    self.max = max(self.max, row["b"])

  def terminate(self):
    yield self.key, self.max

spark.udtf.register("filter_udtf", FilterUDTF)

Sie können die Partitionierungsoptionen beim Aufruf der UDTF über die Eingabetabelle auf mehrfache Weise angeben:

-- Create an input table with some example values.
DROP TABLE IF EXISTS values_table;
CREATE TABLE values_table (a STRING, b INT);
INSERT INTO values_table VALUES ('abc', 2), ('abc', 4), ('def', 6), ('def', 8)";
SELECT * FROM values_table;
+-------+----+
|     a |  b |
+-------+----+
| "abc" | 2  |
| "abc" | 4  |
| "def" | 6  |
| "def" | 8  |
+-------+----+
-- Query the UDTF with the input table as an argument and a directive to partition the input
-- rows such that all rows with each unique value in the `a` column are processed by the same
-- instance of the UDTF class. Within each partition, the rows are ordered by the `b` column.
SELECT * FROM filter_udtf(TABLE(values_table) PARTITION BY a ORDER BY b) ORDER BY 1;
+-------+----+
|     a |  b |
+-------+----+
| "abc" | 4  |
| "def" | 8  |
+-------+----+

-- Query the UDTF with the input table as an argument and a directive to partition the input
-- rows such that all rows with each unique result of evaluating the "LENGTH(a)" expression are
-- processed by the same instance of the UDTF class. Within each partition, the rows are ordered
-- by the `b` column.
SELECT * FROM filter_udtf(TABLE(values_table) PARTITION BY LENGTH(a) ORDER BY b) ORDER BY 1;
+-------+---+
|     a | b |
+-------+---+
| "def" | 8 |
+-------+---+
-- Query the UDTF with the input table as an argument and a directive to consider all the input
-- rows in one single partition such that exactly one instance of the UDTF class consumes all of
-- the input rows. Within each partition, the rows are ordered by the `b` column.
SELECT * FROM filter_udtf(TABLE(values_table) WITH SINGLE PARTITION ORDER BY b) ORDER BY 1;
+-------+----+
|     a |  b |
+-------+----+
| "def" | 8 |
+-------+----+

Angeben einer Partitionierung der Eingabezeilen aus der Methode analyze

Beachten Sie, dass für jede der oben genannten Möglichkeiten zum Partitionieren der Eingabetabelle beim Aufrufen von UDTFs in SQL-Abfragen eine entsprechende Möglichkeit für die UDTF-Methode analyze vorhanden ist, um stattdessen die gleiche Partitionierungsmethode automatisch anzugeben.

  • Anstatt eine UDTF als SELECT * FROM udtf(TABLE(t) PARTITION BY a) aufzurufen, können Sie die Methode analyze aktualisieren, um das Feld partitionBy=[PartitioningColumn("a")] festzulegen und die Funktion einfach mithilfe von SELECT * FROM udtf(TABLE(t)) aufzurufen.
  • Entsprechend können Sie, anstatt TABLE(t) WITH SINGLE PARTITION ORDER BY b in der SQL-Abfrage anzugeben, analyze die Felder withSinglePartition=true und orderBy=[OrderingColumn("b")] festlegen lassen und dann einfach TABLE(t) übergeben.
  • Anstatt TABLE(SELECT a FROM t) in der SQL-Abfrage zu übergeben, können Sie analyze den Wert select=[SelectedColumn("a")] festlegen lassen und dann einfach TABLE(t) übergeben.

Im folgenden Beispiel gibt analyze ein konstantes Ausgabenschema zurück, wählt eine Teilmenge von Spalten aus der Eingabetabelle aus, und gibt an, dass die Eingabetabelle über mehrere UDTF-Aufrufe hinweg basierend auf den Werten der Spalte date partitioniert ist:

@staticmethod
def analyze(*args) -> AnalyzeResult:
  """
  The input table will be partitioned across several UDTF calls based on the monthly
  values of each `date` column. The rows within each partition will arrive ordered by the `date`
  column. The UDTF will only receive the `date` and `word` columns from the input table.
  """
  from pyspark.sql.functions import (
    AnalyzeResult,
    OrderingColumn,
    PartitioningColumn,
  )

  assert len(args) == 1, "This function accepts one argument only"
  assert args[0].isTable, "Only table arguments are supported"
  return AnalyzeResult(
    schema=StructType()
      .add("month", DateType())
      .add('longest_word", IntegerType()),
    partitionBy=[
      PartitioningColumn("extract(month from date)")],
    orderBy=[
      OrderingColumn("date")],
    select=[
      SelectedColumn("date"),
      SelectedColumn(
        name="length(word),
        alias="length_word")])