Delen via


Door de gebruiker gedefinieerde Python-tabelfuncties (UDF's)

Belangrijk

Deze functie bevindt zich in openbare preview in Databricks Runtime 14.3 LTS en hoger.

Met een door de gebruiker gedefinieerde tabelfunctie (UDTF) kunt u functies registreren die tabellen retourneren in plaats van scalaire waarden. In tegenstelling tot scalaire functies die één resultaatwaarde retourneren uit elke aanroep, wordt elke UDTF aangeroepen in de component van FROM een SQL-instructie en wordt een hele tabel als uitvoer geretourneerd.

Elke UDTF-aanroep kan nul of meer argumenten accepteren. Deze argumenten kunnen scalaire expressies of tabelargumenten zijn die volledige invoertabellen vertegenwoordigen.

Eenvoudige UDTF-syntaxis

Apache Spark implementeert Python UDFS als Python-klassen met een verplichte eval methode die wordt gebruikt yield om uitvoerrijen te verzenden.

Als u uw klasse wilt gebruiken als UDTF, moet u de Functie PySpark udtf importeren. Databricks raadt aan deze functie als decorator te gebruiken en expliciet veldnamen en -typen op te geven met behulp van de returnType optie (tenzij de klasse een analyze methode definieert zoals beschreven in een latere sectie).

Met de volgende UDTF maakt u een tabel met een vaste lijst met twee gehele getallen:

from pyspark.sql.functions import lit, udtf

@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
    def eval(self, x: int, y: int):
        yield x + y, x - y

GetSumDiff(lit(1), lit(2)).show()
+----+-----+
| sum| diff|
+----+-----+
|   3|   -1|
+----+-----+

Een UDTF registreren

UDDF's worden geregistreerd bij de lokale SparkSession en worden geïsoleerd op notebook- of taakniveau.

U KUNT UDDF's niet registreren als objecten in Unity Catalog en UDDF's kunnen niet worden gebruikt met SQL-warehouses.

U kunt een UDTF registreren bij de huidige SparkSession voor gebruik in SQL-query's met de functie spark.udtf.register(). Geef een naam op voor de SQL-functie en de Python UDTF-klasse.

spark.udtf.register("get_sum_diff", GetSumDiff)

Een geregistreerde UDTF aanroepen

Nadat u zich hebt geregistreerd, kunt u de UDTF in SQL gebruiken met behulp van de %sql magic-opdracht of spark.sql() functie:

spark.udtf.register("get_sum_diff", GetSumDiff)
spark.sql("SELECT * FROM get_sum_diff(1,2);")
%sql
SELECT * FROM get_sum_diff(1,2);

Apache Arrow gebruiken

Als uw UDTF een kleine hoeveelheid gegevens ontvangt als invoer, maar een grote tabel uitvoert, raadt Databricks aan Apache Arrow te gebruiken. U kunt deze inschakelen door de useArrow parameter op te geven bij het declareren van de UDTF:

@udtf(returnType="c1: int, c2: int", useArrow=True)

Lijsten met variabelenargumenten - *args en **kwargs

U kunt Python *args of **kwargs syntaxis gebruiken en logica implementeren om een niet-opgegeven aantal invoerwaarden te verwerken.

In het volgende voorbeeld wordt hetzelfde resultaat geretourneerd terwijl de invoerlengte en typen voor de argumenten expliciet worden gecontroleerd:

@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
    def eval(self, *args):
        assert(len(args) == 2)
        assert(isinstance(arg, int) for arg in args)
        x = args[0]
        y = args[1]
        yield x + y, x - y

GetSumDiff(lit(1), lit(2)).show()

Dit is hetzelfde voorbeeld, maar het gebruik van trefwoordargumenten:

@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
    def eval(self, **kwargs):
        x = kwargs["x"]
        y = kwargs["y"]
        yield x + y, x - y

GetSumDiff(x=lit(1), y=lit(2)).show()

Een statisch schema definiëren tijdens de registratie

De UDTF retourneert rijen met een uitvoerschema dat bestaat uit een geordende reeks kolomnamen en -typen. Als het UDTF-schema altijd hetzelfde moet blijven voor alle query's, kunt u een statisch, vast schema opgeven na de @udtf decorator. Het moet een StructTypevan de volgende zijn:

StructType().add("c1", StringType())

Of een DDL-tekenreeks die een structtype vertegenwoordigt:

c1: string

Een dynamisch schema berekenen tijdens het aanroepen van functies

UDDF's kunnen het uitvoerschema ook programmatisch berekenen voor elke aanroep, afhankelijk van de waarden van de invoerargumenten. Hiervoor definieert u een statische methode analyze die nul of meer parameters accepteert die overeenkomen met de argumenten die zijn opgegeven voor de specifieke UDTF-aanroep.

Elk argument van de analyze methode is een exemplaar van de AnalyzeArgument klasse die de volgende velden bevat:

AnalyzeArgument klasseveld Beschrijving
dataType Het type van het invoerargument als een DataType. Voor invoertabelargumenten is dit een StructType weergave van de kolommen van de tabel.
value De waarde van het invoerargument als een Optional[Any]. Dit is None voor tabelargumenten of letterlijke scalaire argumenten die niet constant zijn.
isTable Of het invoerargument een tabel is als een BooleanType.
isConstantExpression Of het invoerargument een constante vouwbare expressie is als een BooleanType.

De analyze methode retourneert een exemplaar van de AnalyzeResult klasse, dat het schema van de resultaattabel bevat als plus StructType enkele optionele velden. Als de UDTF een invoertabelargument accepteert, kan het AnalyzeResult ook een aangevraagde manier bevatten om de rijen van de invoertabel te partitioneren en te ordenen voor verschillende UDTF-aanroepen, zoals later wordt beschreven.

AnalyzeResult klasseveld Beschrijving
schema Het schema van de resultaattabel als een StructType.
withSinglePartition Of alle invoerrijen moeten worden verzonden naar hetzelfde UDTF-klasse-exemplaar als een BooleanType.
partitionBy Als deze optie is ingesteld op niet-leeg, worden alle rijen met elke unieke combinatie van waarden van de partitioneringsexpressies gebruikt door een afzonderlijk exemplaar van de UDTF-klasse.
orderBy Als deze optie is ingesteld op niet-leeg, geeft dit een volgorde van rijen binnen elke partitie op.
select Als dit is ingesteld op niet-leeg, is dit een reeks expressies die de UDTF opgeeft voor Katalysator om te evalueren op basis van de kolommen in het argument invoerTABEL. De UDTF ontvangt één invoerkenmerk voor elke naam in de lijst in de volgorde waarin ze worden vermeld.

In dit analyze voorbeeld wordt één uitvoerkolom geretourneerd voor elk woord in het argument invoertekenreeks.

@udtf
class MyUDTF:
  @staticmethod
  def analyze(text: AnalyzeArgument) -> AnalyzeResult:
    schema = StructType()
    for index, word in enumerate(sorted(list(set(text.value.split(" "))))):
      schema = schema.add(f"word_{index}", IntegerType())
    return AnalyzeResult(schema=schema)

  def eval(self, text: str):
    counts = {}
    for word in text.split(" "):
      if word not in counts:
            counts[word] = 0
      counts[word] += 1
    result = []
    for word in sorted(list(set(text.split(" ")))):
      result.append(counts[word])
    yield result
['word_0', 'word_1']

Status doorschakelen naar toekomstige eval aanroepen

De analyze methode kan fungeren als een handige plaats om initialisatie uit te voeren en de resultaten vervolgens door te sturen naar toekomstige eval methode-aanroepen voor dezelfde UDTF-aanroep.

Hiervoor maakt u een subklasse van AnalyzeResult en retourneert u een exemplaar van de subklasse van de analyze methode. Voeg vervolgens een extra argument toe aan de __init__ methode om dat exemplaar te accepteren.

In dit analyze voorbeeld wordt een constant uitvoerschema geretourneerd, maar worden aangepaste gegevens toegevoegd in de metagegevens van de resultaten die moeten worden gebruikt door toekomstige __init__ methode-aanroepen:

@dataclass
class AnalyzeResultWithBuffer(AnalyzeResult):
    buffer: str = ""

@udtf
class TestUDTF:
  def __init__(self, analyze_result=None):
    self._total = 0
    if analyze_result is not None:
      self._buffer = analyze_result.buffer
    else:
      self._buffer = ""

  @staticmethod
  def analyze(argument, _) -> AnalyzeResult:
    if (
      argument.value is None
      or argument.isTable
      or not isinstance(argument.value, str)
      or len(argument.value) == 0
    ):
      raise Exception("The first argument must be a non-empty string")
    assert argument.dataType == StringType()
    assert not argument.isTable
    return AnalyzeResultWithBuffer(
      schema=StructType()
        .add("total", IntegerType())
        .add("buffer", StringType()),
      withSinglePartition=True,
      buffer=argument.value,
    )

  def eval(self, argument, row: Row):
    self._total += 1

  def terminate(self):
    yield self._total, self._buffer

self.spark.udtf.register("test_udtf", TestUDTF)

spark.sql(
  """
  WITH t AS (
    SELECT id FROM range(1, 21)
  )
  SELECT total, buffer
  FROM test_udtf("abc", TABLE(t))
  """
).show()
+-------+-------+
| count | buffer|
+-------+-------+
|    20 |  "abc"|
+-------+-------+

Uitvoerrijen opleveren

De eval methode wordt eenmaal uitgevoerd voor elke rij van het argument invoertabel (of slechts één keer als er geen tabelargument wordt opgegeven), gevolgd door één aanroep van de terminate methode aan het einde. De methode voert nul of meer rijen uit die voldoen aan het resultaatschema door tuples, lijsten of pyspark.sql.Row objecten te genereren.

In dit voorbeeld wordt een rij geretourneerd door een tuple van drie elementen op te geven:

def eval(self, x, y, z):
  yield (x, y, z)

U kunt ook de haakjes weglaten:

def eval(self, x, y, z):
  yield x, y, z

Voeg een volgkomma toe om een rij met slechts één kolom te retourneren:

def eval(self, x, y, z):
  yield x,

U kunt ook een pyspark.sql.Row object opleveren.

def eval(self, x, y, z)
  from pyspark.sql.types import Row
  yield Row(x, y, z)

In dit voorbeeld worden uitvoerrijen van de terminate methode geretourneerd met behulp van een Python-lijst. U kunt de status in de klasse opslaan uit eerdere stappen in de UDTF-evaluatie voor dit doel.

def terminate(self):
  yield [self.x, self.y, self.z]

Scalaire argumenten doorgeven aan een UDTF

U kunt scalaire argumenten doorgeven aan een UDTF als constante expressies die letterlijke waarden of functies bevatten op basis hiervan. Voorbeeld:

SELECT * FROM udtf(42, group => upper("finance_department"));

Tabelargumenten doorgeven aan een UDTF

Python UDFS kunnen naast scalaire invoerargumenten een invoertabel als argument accepteren. Eén UDTF kan ook een tabelargument en meerdere scalaire argumenten accepteren.

Vervolgens kan elke SQL-query een invoertabel opgeven met behulp van het TABLE trefwoord gevolgd door haakjes rond een geschikte tabel-id, zoals TABLE(t). U kunt ook een tabelsubquery doorgeven, zoals TABLE(SELECT a, b, c FROM t) of TABLE(SELECT t1.a, t2.b FROM t1 INNER JOIN t2 USING (key)).

Het argument invoertabel wordt vervolgens weergegeven als een pyspark.sql.Row argument voor de eval methode, met één aanroep naar de eval methode voor elke rij in de invoertabel. U kunt standaardaantekeningen voor pySpark-kolomvelden gebruiken om te communiceren met kolommen in elke rij. In het volgende voorbeeld ziet u hoe u het PySpark-type Row expliciet importeert en vervolgens de doorgegeven tabel in het id veld filtert:

from pyspark.sql.functions import udtf
from pyspark.sql.types import Row

@udtf(returnType="id: int")
class FilterUDTF:
    def eval(self, row: Row):
        if row["id"] > 5:
            yield row["id"],

spark.udtf.register("filter_udtf", FilterUDTF)

Als u een query wilt uitvoeren op de functie, gebruikt u het TABLE trefwoord SQL:

SELECT * FROM filter_udtf(TABLE(SELECT * FROM range(10)));
+---+
| id|
+---+
|  6|
|  7|
|  8|
|  9|
+---+

Een partitionering van de invoerrijen van functie-aanroepen opgeven

Wanneer u een UDTF aanroept met een tabelargument, kan elke SQL-query de invoertabel partitioneren in verschillende UDTF-aanroepen op basis van de waarden van een of meer invoertabelkolommen.

Als u een partitie wilt opgeven, gebruikt u de PARTITION BY component in de functieaanroep na het TABLE argument. Dit garandeert dat alle invoerrijen met elke unieke combinatie van waarden van de partitioneringskolommen worden verbruikt door precies één exemplaar van de UDTF-klasse.

Naast eenvoudige kolomverwijzingen accepteert de PARTITION BY component ook willekeurige expressies op basis van invoertabelkolommen. U kunt bijvoorbeeld de LENGTH tekenreeks opgeven, een maand uit een datum extraheren of twee waarden samenvoegen.

Het is ook mogelijk om op te geven WITH SINGLE PARTITION in plaats van PARTITION BY slechts één partitie aan te vragen waarbij alle invoerrijen moeten worden gebruikt door precies één exemplaar van de UDTF-klasse.

Binnen elke partitie kunt u desgewenst een vereiste volgorde van de invoerrijen opgeven, omdat de methode UDTF eval deze gebruikt. Als u dit wilt doen, geeft u een ORDER BY component op na de PARTITION BY hierboven beschreven of WITH SINGLE PARTITION component.

Denk bijvoorbeeld aan de volgende UDTF:

from pyspark.sql.functions import udtf
from pyspark.sql.types import Row

@udtf(returnType="a: string, b: int")
class FilterUDTF:
  def __init__(self):
    self.key = ""
    self.max = 0

  def eval(self, row: Row):
    self.key = row["a"]
    self.max = max(self.max, row["b"])

  def terminate(self):
    yield self.key, self.max

spark.udtf.register("filter_udtf", FilterUDTF)

U kunt partitioneringsopties opgeven bij het aanroepen van de UDTF via de invoertabel op meerdere manieren:

-- Create an input table with some example values.
DROP TABLE IF EXISTS values_table;
CREATE TABLE values_table (a STRING, b INT);
INSERT INTO values_table VALUES ('abc', 2), ('abc', 4), ('def', 6), ('def', 8)";
SELECT * FROM values_table;
+-------+----+
|     a |  b |
+-------+----+
| "abc" | 2  |
| "abc" | 4  |
| "def" | 6  |
| "def" | 8  |
+-------+----+
-- Query the UDTF with the input table as an argument and a directive to partition the input
-- rows such that all rows with each unique value in the `a` column are processed by the same
-- instance of the UDTF class. Within each partition, the rows are ordered by the `b` column.
SELECT * FROM filter_udtf(TABLE(values_table) PARTITION BY a ORDER BY b) ORDER BY 1;
+-------+----+
|     a |  b |
+-------+----+
| "abc" | 4  |
| "def" | 8  |
+-------+----+

-- Query the UDTF with the input table as an argument and a directive to partition the input
-- rows such that all rows with each unique result of evaluating the "LENGTH(a)" expression are
-- processed by the same instance of the UDTF class. Within each partition, the rows are ordered
-- by the `b` column.
SELECT * FROM filter_udtf(TABLE(values_table) PARTITION BY LENGTH(a) ORDER BY b) ORDER BY 1;
+-------+---+
|     a | b |
+-------+---+
| "def" | 8 |
+-------+---+
-- Query the UDTF with the input table as an argument and a directive to consider all the input
-- rows in one single partition such that exactly one instance of the UDTF class consumes all of
-- the input rows. Within each partition, the rows are ordered by the `b` column.
SELECT * FROM filter_udtf(TABLE(values_table) WITH SINGLE PARTITION ORDER BY b) ORDER BY 1;
+-------+----+
|     a |  b |
+-------+----+
| "def" | 8 |
+-------+----+

Een partitionering van de invoerrijen van de analyze methode opgeven

Houd er rekening mee dat voor elk van de bovenstaande manieren om de invoertabel te partitioneren bij het aanroepen van UDTF's in SQL-query's, er een overeenkomstige manier is voor de methode van de UDTF analyze om in plaats daarvan automatisch dezelfde partitioneringsmethode op te geven.

  • In plaats van een UDTF aan te roepen, SELECT * FROM udtf(TABLE(t) PARTITION BY a)kunt u de analyze methode bijwerken om het veld partitionBy=[PartitioningColumn("a")] in te stellen en gewoon de functie aan te roepen met behulp van SELECT * FROM udtf(TABLE(t)).
  • Met hetzelfde token, in plaats van op te geven TABLE(t) WITH SINGLE PARTITION ORDER BY b in de SQL-query, kunt analyze u de velden withSinglePartition=true instellen en orderBy=[OrderingColumn("b")] vervolgens gewoon doorgeven TABLE(t).
  • In plaats van de SQL-query door te geven TABLE(SELECT a FROM t) , kunt analyze u instellen select=[SelectedColumn("a")] en vervolgens gewoon doorgeven TABLE(t).

In het volgende voorbeeld analyze wordt een constant uitvoerschema geretourneerd, wordt een subset van kolommen uit de invoertabel geselecteerd en wordt aangegeven dat de invoertabel wordt gepartitioneerd voor verschillende UDTF-aanroepen op basis van de waarden van de date kolom:

@staticmethod
def analyze(*args) -> AnalyzeResult:
  """
  The input table will be partitioned across several UDTF calls based on the monthly
  values of each `date` column. The rows within each partition will arrive ordered by the `date`
  column. The UDTF will only receive the `date` and `word` columns from the input table.
  """
  from pyspark.sql.functions import (
    AnalyzeResult,
    OrderingColumn,
    PartitioningColumn,
  )

  assert len(args) == 1, "This function accepts one argument only"
  assert args[0].isTable, "Only table arguments are supported"
  return AnalyzeResult(
    schema=StructType()
      .add("month", DateType())
      .add('longest_word", IntegerType()),
    partitionBy=[
      PartitioningColumn("extract(month from date)")],
    orderBy=[
      OrderingColumn("date")],
    select=[
      SelectedColumn("date"),
      SelectedColumn(
        name="length(word),
        alias="length_word")])