Dela via


Användardefinierade python-tabellfunktioner (UDF: er)

Viktigt!

Den här funktionen är i offentlig förhandsversion i Databricks Runtime 14.3 LTS och senare.

Med en användardefinierad tabellfunktion (UDTF) kan du registrera funktioner som returnerar tabeller i stället för skalära värden. Till skillnad från skalärfunktioner som returnerar ett enda resultatvärde från varje anrop anropas varje UDTF i en SQL-instruktionssats FROM och returnerar en hel tabell som utdata.

Varje UDTF-anrop kan acceptera noll eller fler argument. Dessa argument kan vara skalära uttryck eller tabellargument som representerar hela indatatabeller.

Grundläggande UDTF-syntax

Apache Spark implementerar Python UDTFs som Python-klasser med en obligatorisk eval metod som använder yield för att generera utdatarader.

Om du vill använda klassen som UDTF måste du importera funktionen PySpark udtf . Databricks rekommenderar att du använder den här funktionen som dekoratör och uttryckligen anger fältnamn och typer med hjälp av returnType alternativet (såvida inte klassen definierar en analyze metod enligt beskrivningen i ett senare avsnitt).

Följande UDTF skapar en tabell med en fast lista med två heltalsargument:

from pyspark.sql.functions import lit, udtf

@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
    def eval(self, x: int, y: int):
        yield x + y, x - y

GetSumDiff(lit(1), lit(2)).show()
+----+-----+
| sum| diff|
+----+-----+
|   3|   -1|
+----+-----+

Registrera en UDTF

UDF:er registreras lokalt SparkSession och isoleras på notebook- eller jobbnivå.

Du kan inte registrera UDF:er som objekt i Unity Catalog, och UDF:er kan inte användas med SQL-lager.

Du kan registrera en UDTF till aktuell SparkSession för användning i SQL-frågor med funktionen spark.udtf.register(). Ange ett namn för SQL-funktionen och Python UDTF-klassen.

spark.udtf.register("get_sum_diff", GetSumDiff)

Anropa en registrerad UDTF

När du har registrerat dig kan du använda UDTF i SQL med antingen det %sql magiska kommandot eller spark.sql() funktionen:

spark.udtf.register("get_sum_diff", GetSumDiff)
spark.sql("SELECT * FROM get_sum_diff(1,2);")
%sql
SELECT * FROM get_sum_diff(1,2);

Använda Apache Arrow

Om UDTF tar emot en liten mängd data som indata men matar ut en stor tabell rekommenderar Databricks att du använder Apache Arrow. Du kan aktivera den genom att ange parametern useArrow när du deklarerar UDTF:

@udtf(returnType="c1: int, c2: int", useArrow=True)

Variabelargumentlistor – *args och **kwargs

Du kan använda Python *args eller **kwargs syntax och implementera logik för att hantera ett ospecificerat antal indatavärden.

I följande exempel returneras samma resultat när du uttryckligen kontrollerar indatalängden och typerna för argumenten:

@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
    def eval(self, *args):
        assert(len(args) == 2)
        assert(isinstance(arg, int) for arg in args)
        x = args[0]
        y = args[1]
        yield x + y, x - y

GetSumDiff(lit(1), lit(2)).show()

Här är samma exempel, men med nyckelordsargument:

@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
    def eval(self, **kwargs):
        x = kwargs["x"]
        y = kwargs["y"]
        yield x + y, x - y

GetSumDiff(x=lit(1), y=lit(2)).show()

Definiera ett statiskt schema vid registreringen

UDTF returnerar rader med ett utdataschema som består av en ordnad sekvens med kolumnnamn och typer. Om UDTF-schemat alltid ska vara detsamma för alla frågor kan du ange ett statiskt, fast schema efter dekoratören @udtf . Det måste antingen vara en StructType:

StructType().add("c1", StringType())

Eller en DDL-sträng som representerar en structtyp:

c1: string

Beräkna ett dynamiskt schema vid funktionsanropstid

UDF:er kan också beräkna utdataschemat programmatiskt för varje anrop beroende på värdena för indataargumenten. För att göra detta definierar du en statisk metod med namnet analyze som accepterar noll eller fler parametrar som motsvarar argumenten som tillhandahålls till det specifika UDTF-anropet.

Varje argument för analyze metoden är en instans av AnalyzeArgument klassen som innehåller följande fält:

AnalyzeArgument klassfält beskrivning
dataType Typen av indataargument som en DataType. För argument för indatatabeller är detta en StructType som representerar tabellens kolumner.
value Värdet för indataargumentet som en Optional[Any]. Detta gäller None för tabellargument eller literalskalära argument som inte är konstanta.
isTable Om indataargumentet är en tabell som en BooleanType.
isConstantExpression Om indataargumentet är ett konstant vikbart uttryck som en BooleanType.

Metoden analyze returnerar en instans av AnalyzeResult klassen, som innehåller resultattabellens schema som ett StructType plus några valfria fält. Om UDTF accepterar ett argument för indatatabellen AnalyzeResult kan du även inkludera ett begärt sätt att partitionera och sortera raderna i indatatabellen över flera UDTF-anrop, enligt beskrivningen senare.

AnalyzeResult klassfält beskrivning
schema Schemat för resultattabellen som en StructType.
withSinglePartition Om alla indatarader ska skickas till samma UDTF-klassinstans som en BooleanType.
partitionBy Om värdet inte är tomt används alla rader med varje unik kombination av värden för partitioneringsuttrycken av en separat instans av UDTF-klassen.
orderBy Om värdet inte är tomt anger detta en ordning på rader inom varje partition.
select Om värdet inte är tomt är detta en sekvens med uttryck som UDTF anger för Catalyst att utvärdera mot kolumnerna i indatatabellargumentet. UDTF tar emot ett indataattribut för varje namn i listan i den ordning de visas.

Det här analyze exemplet returnerar en utdatakolumn för varje ord i argumentet för indatasträngen.

@udtf
class MyUDTF:
  @staticmethod
  def analyze(text: AnalyzeArgument) -> AnalyzeResult:
    schema = StructType()
    for index, word in enumerate(sorted(list(set(text.value.split(" "))))):
      schema = schema.add(f"word_{index}", IntegerType())
    return AnalyzeResult(schema=schema)

  def eval(self, text: str):
    counts = {}
    for word in text.split(" "):
      if word not in counts:
            counts[word] = 0
      counts[word] += 1
    result = []
    for word in sorted(list(set(text.split(" ")))):
      result.append(counts[word])
    yield result
['word_0', 'word_1']

Vidarebefordra tillstånd till framtida eval anrop

Metoden analyze kan fungera som en lämplig plats för att utföra initiering och sedan vidarebefordra resultaten till framtida eval metodanrop för samma UDTF-anrop.

För att göra det skapar du en underklass av AnalyzeResult och returnerar en instans av underklassen analyze från -metoden. Lägg sedan till ytterligare ett argument i metoden för att acceptera den instansen __init__ .

Det här analyze exemplet returnerar ett konstant utdataschema, men lägger till anpassad information i resultatmetadata som ska användas av framtida __init__ metodanrop:

@dataclass
class AnalyzeResultWithBuffer(AnalyzeResult):
    buffer: str = ""

@udtf
class TestUDTF:
  def __init__(self, analyze_result=None):
    self._total = 0
    if analyze_result is not None:
      self._buffer = analyze_result.buffer
    else:
      self._buffer = ""

  @staticmethod
  def analyze(argument, _) -> AnalyzeResult:
    if (
      argument.value is None
      or argument.isTable
      or not isinstance(argument.value, str)
      or len(argument.value) == 0
    ):
      raise Exception("The first argument must be a non-empty string")
    assert argument.dataType == StringType()
    assert not argument.isTable
    return AnalyzeResultWithBuffer(
      schema=StructType()
        .add("total", IntegerType())
        .add("buffer", StringType()),
      withSinglePartition=True,
      buffer=argument.value,
    )

  def eval(self, argument, row: Row):
    self._total += 1

  def terminate(self):
    yield self._total, self._buffer

self.spark.udtf.register("test_udtf", TestUDTF)

spark.sql(
  """
  WITH t AS (
    SELECT id FROM range(1, 21)
  )
  SELECT total, buffer
  FROM test_udtf("abc", TABLE(t))
  """
).show()
+-------+-------+
| count | buffer|
+-------+-------+
|    20 |  "abc"|
+-------+-------+

Ge utdatarader

Metoden eval körs en gång för varje rad i indatatabellargumentet (eller bara en gång om inget tabellargument anges), följt av ett anrop av terminate metoden i slutet. Antingen matar metoden ut noll eller fler rader som överensstämmer med resultatschemat genom att ge tupplar, listor eller pyspark.sql.Row objekt.

Det här exemplet returnerar en rad genom att ange en tuppeln med tre element:

def eval(self, x, y, z):
  yield (x, y, z)

Du kan också utelämna parenteserna:

def eval(self, x, y, z):
  yield x, y, z

Lägg till ett avslutande kommatecken för att returnera en rad med endast en kolumn:

def eval(self, x, y, z):
  yield x,

Du kan också ge ett pyspark.sql.Row objekt.

def eval(self, x, y, z)
  from pyspark.sql.types import Row
  yield Row(x, y, z)

Det här exemplet ger utdatarader från terminate metoden med hjälp av en Python-lista. Du kan lagra tillståndet i klassen från tidigare steg i UDTF-utvärderingen för det här ändamålet.

def terminate(self):
  yield [self.x, self.y, self.z]

Skicka skalära argument till en UDTF

Du kan skicka skalära argument till en UDTF som konstanta uttryck som består av literalvärden eller funktioner baserat på dem. Till exempel:

SELECT * FROM udtf(42, group => upper("finance_department"));

Skicka tabellargument till en UDTF

Python-UDF:er kan acceptera en indatatabell som ett argument utöver skalära indataargument. En enskild UDTF kan också acceptera ett tabellargument och flera skalära argument.

Sedan kan valfri SQL-fråga tillhandahålla en indatatabell med hjälp av nyckelordet TABLE följt av parenteser som omger en lämplig tabellidentifierare, till exempel TABLE(t). Du kan också skicka en tabellunderfråga, till exempel TABLE(SELECT a, b, c FROM t) eller TABLE(SELECT t1.a, t2.b FROM t1 INNER JOIN t2 USING (key)).

Argumentet för indatatabellen representeras sedan som ett pyspark.sql.Row argument för eval metoden, med ett anrop till eval metoden för varje rad i indatatabellen. Du kan använda standardkommentarer för PySpark-kolumnfält för att interagera med kolumner i varje rad. I följande exempel visas explicit import av PySpark-typen Row och sedan filtrering av den skickade tabellen i id fältet:

from pyspark.sql.functions import udtf
from pyspark.sql.types import Row

@udtf(returnType="id: int")
class FilterUDTF:
    def eval(self, row: Row):
        if row["id"] > 5:
            yield row["id"],

spark.udtf.register("filter_udtf", FilterUDTF)

Om du vill köra frågor mot funktionen använder du SQL-nyckelordet TABLE :

SELECT * FROM filter_udtf(TABLE(SELECT * FROM range(10)));
+---+
| id|
+---+
|  6|
|  7|
|  8|
|  9|
+---+

Ange partitionering av indatarader från funktionsanrop

När du anropar en UDTF med ett tabellargument kan alla SQL-frågor partitionera indatatabellen över flera UDTF-anrop baserat på värdena för en eller flera indatatabellkolumner.

Om du vill ange en partition använder du PARTITION BY -satsen i funktionsanropet TABLE efter argumentet. Detta garanterar att alla indatarader med varje unik kombination av värden i partitioneringskolumnerna förbrukas av exakt en instans av UDTF-klassen.

Observera att förutom enkla kolumnreferenser PARTITION BY accepterar satsen även godtyckliga uttryck baserat på indatatabellkolumner. Du kan till exempel ange LENGTH för en sträng, extrahera en månad från ett datum eller sammanfoga två värden.

Det går också att ange WITH SINGLE PARTITION i stället för PARTITION BY att bara begära en partition där alla indatarader måste förbrukas av exakt en instans av UDTF-klassen.

Inom varje partition kan du ange en obligatorisk ordning på indataraderna eftersom UDTF-metoden eval använder dem. Det gör du genom att ange en ORDER BY sats efter eller-satsen PARTITION BY WITH SINGLE PARTITION som beskrivs ovan.

Tänk till exempel på följande UDTF:

from pyspark.sql.functions import udtf
from pyspark.sql.types import Row

@udtf(returnType="a: string, b: int")
class FilterUDTF:
  def __init__(self):
    self.key = ""
    self.max = 0

  def eval(self, row: Row):
    self.key = row["a"]
    self.max = max(self.max, row["b"])

  def terminate(self):
    yield self.key, self.max

spark.udtf.register("filter_udtf", FilterUDTF)

Du kan ange partitioneringsalternativ när du anropar UDTF över indatatabellen på flera olika sätt:

-- Create an input table with some example values.
DROP TABLE IF EXISTS values_table;
CREATE TABLE values_table (a STRING, b INT);
INSERT INTO values_table VALUES ('abc', 2), ('abc', 4), ('def', 6), ('def', 8)";
SELECT * FROM values_table;
+-------+----+
|     a |  b |
+-------+----+
| "abc" | 2  |
| "abc" | 4  |
| "def" | 6  |
| "def" | 8  |
+-------+----+
-- Query the UDTF with the input table as an argument and a directive to partition the input
-- rows such that all rows with each unique value in the `a` column are processed by the same
-- instance of the UDTF class. Within each partition, the rows are ordered by the `b` column.
SELECT * FROM filter_udtf(TABLE(values_table) PARTITION BY a ORDER BY b) ORDER BY 1;
+-------+----+
|     a |  b |
+-------+----+
| "abc" | 4  |
| "def" | 8  |
+-------+----+

-- Query the UDTF with the input table as an argument and a directive to partition the input
-- rows such that all rows with each unique result of evaluating the "LENGTH(a)" expression are
-- processed by the same instance of the UDTF class. Within each partition, the rows are ordered
-- by the `b` column.
SELECT * FROM filter_udtf(TABLE(values_table) PARTITION BY LENGTH(a) ORDER BY b) ORDER BY 1;
+-------+---+
|     a | b |
+-------+---+
| "def" | 8 |
+-------+---+
-- Query the UDTF with the input table as an argument and a directive to consider all the input
-- rows in one single partition such that exactly one instance of the UDTF class consumes all of
-- the input rows. Within each partition, the rows are ordered by the `b` column.
SELECT * FROM filter_udtf(TABLE(values_table) WITH SINGLE PARTITION ORDER BY b) ORDER BY 1;
+-------+----+
|     a |  b |
+-------+----+
| "def" | 8 |
+-------+----+

Ange en partitionering av indataraderna analyze från metoden

Observera att för vart och ett av ovanstående sätt att partitionera indatatabellen när du anropar UDF:er i SQL-frågor finns det ett motsvarande sätt för UDTF-metoden analyze att ange samma partitioneringsmetod automatiskt i stället.

  • I stället för att anropa en UDTF som SELECT * FROM udtf(TABLE(t) PARTITION BY a)kan du uppdatera analyze metoden för att ange fältet partitionBy=[PartitioningColumn("a")] och helt enkelt anropa funktionen med .SELECT * FROM udtf(TABLE(t))
  • Med samma token kan du i stället för att TABLE(t) WITH SINGLE PARTITION ORDER BY b ange i SQL-frågan ange analyze fälten withSinglePartition=true och orderBy=[OrderingColumn("b")] sedan bara skicka TABLE(t).
  • I stället för att skicka SQL-frågan TABLE(SELECT a FROM t) kan du ställa analyze in select=[SelectedColumn("a")] och sedan bara skicka TABLE(t).

I följande exempel analyze returnerar ett konstant utdataschema, väljer en delmängd kolumner från indatatabellen och anger att indatatabellen är partitionerad över flera UDTF-anrop baserat på kolumnens date värden:

@staticmethod
def analyze(*args) -> AnalyzeResult:
  """
  The input table will be partitioned across several UDTF calls based on the monthly
  values of each `date` column. The rows within each partition will arrive ordered by the `date`
  column. The UDTF will only receive the `date` and `word` columns from the input table.
  """
  from pyspark.sql.functions import (
    AnalyzeResult,
    OrderingColumn,
    PartitioningColumn,
  )

  assert len(args) == 1, "This function accepts one argument only"
  assert args[0].isTable, "Only table arguments are supported"
  return AnalyzeResult(
    schema=StructType()
      .add("month", DateType())
      .add('longest_word", IntegerType()),
    partitionBy=[
      PartitioningColumn("extract(month from date)")],
    orderBy=[
      OrderingColumn("date")],
    select=[
      SelectedColumn("date"),
      SelectedColumn(
        name="length(word),
        alias="length_word")])