Dela via


Användardefinierade python-tabellfunktioner (UDF: er)

Viktig

Den här funktionen finns i offentlig förhandsversion i Databricks Runtime 14.3 LTS och senare.

Med en användardefinierad tabellfunktion (UDTF) kan du registrera funktioner som returnerar tabeller i stället för skalära värden. Till skillnad från skalärfunktioner som returnerar ett enda resultatvärde från varje anrop anropas varje UDTF i sql-instruktionens FROM-sats och returnerar en hel tabell som utdata.

Varje UDTF-anrop kan acceptera noll eller fler argument. Dessa argument kan vara skalära uttryck eller tabellargument som representerar hela indatatabeller.

Grundläggande UDTF-syntax

Apache Spark implementerar Python UDTFs som Python-klasser med en obligatorisk eval-metod som använder yield för att generera utdatarader.

Om du vill använda klassen som UDTF måste du importera funktionen PySpark udtf. Databricks rekommenderar att du använder den här funktionen som dekoratör och uttryckligen anger fältnamn och typer med hjälp av alternativet returnType (såvida inte klassen definierar en analyze-metod enligt beskrivningen i ett senare avsnitt).

Följande UDTF skapar en tabell med en fast lista med två heltalsargument:

from pyspark.sql.functions import lit, udtf

@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
    def eval(self, x: int, y: int):
        yield x + y, x - y

GetSumDiff(lit(1), lit(2)).show()
+----+-----+
| sum| diff|
+----+-----+
|   3|   -1|
+----+-----+

Registrera en UDTF

UDTF:er är registrerade till det lokala SparkSession och är isolerade på notebook- eller jobbnivå.

Du kan inte registrera UDF:er som objekt i Unity Catalog, och UDF:er kan inte användas med SQL-lager.

Du kan registrera en UDTF till den aktuella SparkSession för användning i SQL-frågor med funktionen spark.udtf.register(). Ange ett namn för SQL-funktionen och Python UDTF-klassen.

spark.udtf.register("get_sum_diff", GetSumDiff)

Anropa en registrerad UDTF

När du har registrerat dig kan du använda UDTF i SQL med antingen det %sql magiska kommandot eller spark.sql() funktionen:

spark.udtf.register("get_sum_diff", GetSumDiff)
spark.sql("SELECT * FROM get_sum_diff(1,2);")
%sql
SELECT * FROM get_sum_diff(1,2);

Använda Apache Arrow

Om UDTF tar emot en liten mängd data som indata men matar ut en stor tabell rekommenderar Databricks att du använder Apache Arrow. Du kan aktivera den genom att ange parametern useArrow när du deklarerar UDTF:

@udtf(returnType="c1: int, c2: int", useArrow=True)

Variabelargumentlistor – *args och **kwargs

Du kan använda Python-*args eller **kwargs syntax och implementera logik för att hantera ett ospecificerat antal indatavärden.

I följande exempel returneras samma resultat när du uttryckligen kontrollerar indatalängden och typerna för argumenten:

@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
    def eval(self, *args):
        assert(len(args) == 2)
        assert(isinstance(arg, int) for arg in args)
        x = args[0]
        y = args[1]
        yield x + y, x - y

GetSumDiff(lit(1), lit(2)).show()

Här är samma exempel, men med nyckelordsargument:

@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
    def eval(self, **kwargs):
        x = kwargs["x"]
        y = kwargs["y"]
        yield x + y, x - y

GetSumDiff(x=lit(1), y=lit(2)).show()

Definiera ett statiskt schema vid registreringen

UDTF returnerar rader med ett utdataschema som består av en ordnad sekvens med kolumnnamn och typer. Om UDTF-schemat alltid ska vara detsamma för alla frågor kan du ange ett statiskt, fast schema efter @udtf dekoratör. Det måste antingen vara en StructType:

StructType().add("c1", StringType())

Eller en DDL-sträng som representerar en structtyp:

c1: string

Beräkna ett dynamiskt schema vid funktionsanropstid

UDF:er kan också beräkna utdataschemat programmatiskt för varje anrop beroende på värdena för indataargumenten. För att göra detta definierar du en statisk metod som kallas analyze som accepterar noll eller fler parametrar som motsvarar argumenten som tillhandahålls till det specifika UDTF-anropet.

Varje argument i metoden analyze är en instans av klassen AnalyzeArgument som innehåller följande fält:

AnalyzeArgument klassfält Beskrivning
dataType Typen av indataargument som en DataType. För argument för indatatabeller är detta en StructType som representerar tabellens kolumner.
value Värdet för indataargumentet som en Optional[Any]. Det här är None för tabellargument eller literalskalära argument som inte är konstanta.
isTable Om indataargumentet är en tabell som BooleanType.
isConstantExpression Om indataargumentet är ett konstant vikbart uttryck som en BooleanType.

Metoden analyze returnerar en instans av klassen AnalyzeResult, som innehåller resultattabellens schema som ett StructType plus några valfria fält. Om UDTF accepterar ett argument i indatatabellen kan AnalyzeResult också inkludera ett begärt sätt att partitionera och beställa raderna i indatatabellen över flera UDTF-anrop, enligt beskrivningen senare.

AnalyzeResult klassfält Beskrivning
schema Schemat för resultattabellen som en StructType.
withSinglePartition Om alla indatarader ska skickas till samma UDTF-klassinstans som en BooleanType.
partitionBy Om värdet inte är tomt används alla rader med varje unik kombination av värden för partitioneringsuttrycken av en separat instans av UDTF-klassen.
orderBy Om värdet inte är tomt anger detta en ordning på rader inom varje partition.
select Om värdet inte är tomt är detta en sekvens med uttryck som UDTF anger för Catalyst att utvärdera mot kolumnerna i indata TABLE argumentet. UDTF tar emot ett indataattribut för varje namn i listan i den ordning de visas.

Det här analyze exemplet returnerar en utdatakolumn för varje ord i argumentet för indatasträngen.

@udtf
class MyUDTF:
  @staticmethod
  def analyze(text: AnalyzeArgument) -> AnalyzeResult:
    schema = StructType()
    for index, word in enumerate(sorted(list(set(text.value.split(" "))))):
      schema = schema.add(f"word_{index}", IntegerType())
    return AnalyzeResult(schema=schema)

  def eval(self, text: str):
    counts = {}
    for word in text.split(" "):
      if word not in counts:
            counts[word] = 0
      counts[word] += 1
    result = []
    for word in sorted(list(set(text.split(" ")))):
      result.append(counts[word])
    yield result
['word_0', 'word_1']

Vidarebefordra status till framtida eval-anrop

Metoden analyze kan fungera som en praktisk plats för att utföra initiering och sedan vidarebefordra resultatet till framtida eval metodanrop för samma UDTF-anrop.

För att göra det skapar du en underklass av AnalyzeResult och returnerar en instans av underklassen från metoden analyze. Lägg sedan till ytterligare ett argument i metoden __init__ för att acceptera den instansen.

Det här analyze exemplet returnerar ett konstant utdataschema, men lägger till anpassad information i resultatmetadata som ska användas av framtida __init__-metodanrop:

@dataclass
class AnalyzeResultWithBuffer(AnalyzeResult):
    buffer: str = ""

@udtf
class TestUDTF:
  def __init__(self, analyze_result=None):
    self._total = 0
    if analyze_result is not None:
      self._buffer = analyze_result.buffer
    else:
      self._buffer = ""

  @staticmethod
  def analyze(argument, _) -> AnalyzeResult:
    if (
      argument.value is None
      or argument.isTable
      or not isinstance(argument.value, str)
      or len(argument.value) == 0
    ):
      raise Exception("The first argument must be a non-empty string")
    assert argument.dataType == StringType()
    assert not argument.isTable
    return AnalyzeResultWithBuffer(
      schema=StructType()
        .add("total", IntegerType())
        .add("buffer", StringType()),
      withSinglePartition=True,
      buffer=argument.value,
    )

  def eval(self, argument, row: Row):
    self._total += 1

  def terminate(self):
    yield self._total, self._buffer

self.spark.udtf.register("test_udtf", TestUDTF)

spark.sql(
  """
  WITH t AS (
    SELECT id FROM range(1, 21)
  )
  SELECT total, buffer
  FROM test_udtf("abc", TABLE(t))
  """
).show()
+-------+-------+
| count | buffer|
+-------+-------+
|    20 |  "abc"|
+-------+-------+

Ge utdatarader

Metoden eval körs en gång för varje rad i argumentet för indatatabellen (eller bara en gång om inget tabellargument anges), följt av ett anrop av metoden terminate i slutet. Antingen matar metoden ut noll eller fler rader som överensstämmer med resultatschemat genom att ge tupplar, listor eller pyspark.sql.Row objekt.

Det här exemplet returnerar en rad genom att ange en tupl med tre element.

def eval(self, x, y, z):
  yield (x, y, z)

Du kan också utelämna parenteserna:

def eval(self, x, y, z):
  yield x, y, z

Lägg till ett avslutande kommatecken för att returnera en rad med endast en kolumn:

def eval(self, x, y, z):
  yield x,

Du kan också generera ett pyspark.sql.Row-objekt.

def eval(self, x, y, z)
  from pyspark.sql.types import Row
  yield Row(x, y, z)

Det här exemplet ger utdatarader från metoden terminate med hjälp av en Python-lista. Du kan lagra tillståndet i klassen från tidigare steg i UDTF-utvärderingen för det här ändamålet.

def terminate(self):
  yield [self.x, self.y, self.z]

Skicka skalära argument till en UDTF

Du kan skicka skalära argument till en UDTF som konstanta uttryck som består av literalvärden eller funktioner baserat på dem. Till exempel:

SELECT * FROM udtf(42, group => upper("finance_department"));

Skicka tabellargument till en UDTF

Python-UDF:er kan acceptera en indatatabell som ett argument utöver skalära indataargument. En enskild UDTF kan också acceptera ett tabellargument och flera skalära argument.

Sedan kan alla SQL-frågor tillhandahålla en indatatabell med hjälp av nyckelordet TABLE följt av parenteser som omger en lämplig tabellidentifierare, till exempel TABLE(t). Du kan också skicka en tabellunderfråga, till exempel TABLE(SELECT a, b, c FROM t) eller TABLE(SELECT t1.a, t2.b FROM t1 INNER JOIN t2 USING (key)).

Argumentet för indatatabellen representeras sedan som ett pyspark.sql.Row argument för metoden eval, med ett anrop till metoden eval för varje rad i indatatabellen. Du kan använda standardkommentarer för PySpark-kolumnfält för att interagera med kolumner i varje rad. I följande exempel visas explicit import av PySpark-Row typ och sedan filtrering av den skickade tabellen i fältet id:

from pyspark.sql.functions import udtf
from pyspark.sql.types import Row

@udtf(returnType="id: int")
class FilterUDTF:
    def eval(self, row: Row):
        if row["id"] > 5:
            yield row["id"],

spark.udtf.register("filter_udtf", FilterUDTF)

Om du vill köra frågor mot funktionen använder du nyckelordet TABLE SQL:

SELECT * FROM filter_udtf(TABLE(SELECT * FROM range(10)));
+---+
| id|
+---+
|  6|
|  7|
|  8|
|  9|
+---+

Ange en partitionering av indatarader för funktionsanrop

När du anropar en UDTF med ett tabellargument kan alla SQL-frågor partitionera indatatabellen över flera UDTF-anrop baserat på värdena för en eller flera indatatabellkolumner.

Om du vill ange en partition använder du PARTITION BY-satsen i funktionsanropet efter argumentet TABLE. Detta garanterar att alla indatarader med varje unik kombination av värden i partitioneringskolumnerna hanteras av exakt en instans av UDTF-klassen.

Observera att förutom enkla kolumnreferenser accepterar PARTITION BY-satsen även godtyckliga uttryck baserat på indatatabellkolumner. Du kan till exempel ange LENGTH för en sträng, extrahera en månad från ett datum eller sammanfoga två värden.

Det är också möjligt att ange WITH SINGLE PARTITION istället för PARTITION BY för att begära endast en partition där alla indatarader måste konsumeras av exakt en instans av UDTF-klassen.

Inom varje partition kan du ange en obligatorisk ordning på indataraderna eftersom UDTF:s eval-metod använder dem. Det gör du genom att ange en ORDER BY-sats efter satsen PARTITION BY eller WITH SINGLE PARTITION som beskrivs ovan.

Tänk till exempel på följande UDTF:

from pyspark.sql.functions import udtf
from pyspark.sql.types import Row

@udtf(returnType="a: string, b: int")
class FilterUDTF:
  def __init__(self):
    self.key = ""
    self.max = 0

  def eval(self, row: Row):
    self.key = row["a"]
    self.max = max(self.max, row["b"])

  def terminate(self):
    yield self.key, self.max

spark.udtf.register("filter_udtf", FilterUDTF)

Du kan ange partitioneringsalternativ när du anropar UDTF över indatatabellen på flera olika sätt:

-- Create an input table with some example values.
DROP TABLE IF EXISTS values_table;
CREATE TABLE values_table (a STRING, b INT);
INSERT INTO values_table VALUES ('abc', 2), ('abc', 4), ('def', 6), ('def', 8)";
SELECT * FROM values_table;
+-------+----+
|     a |  b |
+-------+----+
| "abc" | 2  |
| "abc" | 4  |
| "def" | 6  |
| "def" | 8  |
+-------+----+
-- Query the UDTF with the input table as an argument and a directive to partition the input
-- rows such that all rows with each unique value in the `a` column are processed by the same
-- instance of the UDTF class. Within each partition, the rows are ordered by the `b` column.
SELECT * FROM filter_udtf(TABLE(values_table) PARTITION BY a ORDER BY b) ORDER BY 1;
+-------+----+
|     a |  b |
+-------+----+
| "abc" | 4  |
| "def" | 8  |
+-------+----+

-- Query the UDTF with the input table as an argument and a directive to partition the input
-- rows such that all rows with each unique result of evaluating the "LENGTH(a)" expression are
-- processed by the same instance of the UDTF class. Within each partition, the rows are ordered
-- by the `b` column.
SELECT * FROM filter_udtf(TABLE(values_table) PARTITION BY LENGTH(a) ORDER BY b) ORDER BY 1;
+-------+---+
|     a | b |
+-------+---+
| "def" | 8 |
+-------+---+
-- Query the UDTF with the input table as an argument and a directive to consider all the input
-- rows in one single partition such that exactly one instance of the UDTF class consumes all of
-- the input rows. Within each partition, the rows are ordered by the `b` column.
SELECT * FROM filter_udtf(TABLE(values_table) WITH SINGLE PARTITION ORDER BY b) ORDER BY 1;
+-------+----+
|     a |  b |
+-------+----+
| "def" | 8 |
+-------+----+

Ange en partitionering av indataraderna från metoden analyze

Observera att för vart och ett av ovanstående sätt att partitionera indatatabellen när du anropar UDF:er i SQL-frågor finns det ett motsvarande sätt för UDTF:s analyze-metod att ange samma partitioneringsmetod automatiskt i stället.

  • I stället för att anropa en UDTF som SELECT * FROM udtf(TABLE(t) PARTITION BY a)kan du uppdatera metoden analyze för att ange fältet partitionBy=[PartitioningColumn("a")] och helt enkelt anropa funktionen med hjälp av SELECT * FROM udtf(TABLE(t)).
  • Med samma token kan du i stället för att ange TABLE(t) WITH SINGLE PARTITION ORDER BY b i SQL-frågan göra analyze ange fälten withSinglePartition=true och orderBy=[OrderingColumn("b")] och sedan bara skicka TABLE(t).
  • I stället för att skicka TABLE(SELECT a FROM t) i SQL-frågan kan du göra analyze ange select=[SelectedColumn("a")] och sedan bara skicka TABLE(t).

I följande exempel returnerar analyze ett konstant utdataschema, väljer en delmängd av kolumner från indatatabellen och anger att indatatabellen är partitionerad över flera UDTF-anrop baserat på värdena i kolumnen date:

@staticmethod
def analyze(*args) -> AnalyzeResult:
  """
  The input table will be partitioned across several UDTF calls based on the monthly
  values of each `date` column. The rows within each partition will arrive ordered by the `date`
  column. The UDTF will only receive the `date` and `word` columns from the input table.
  """
  from pyspark.sql.functions import (
    AnalyzeResult,
    OrderingColumn,
    PartitioningColumn,
  )

  assert len(args) == 1, "This function accepts one argument only"
  assert args[0].isTable, "Only table arguments are supported"
  return AnalyzeResult(
    schema=StructType()
      .add("month", DateType())
      .add('longest_word", IntegerType()),
    partitionBy=[
      PartitioningColumn("extract(month from date)")],
    orderBy=[
      OrderingColumn("date")],
    select=[
      SelectedColumn("date"),
      SelectedColumn(
        name="length(word),
        alias="length_word")])