Användardefinierade python-tabellfunktioner (UDF: er)
Viktig
Den här funktionen finns i offentlig förhandsversion i Databricks Runtime 14.3 LTS och senare.
Med en användardefinierad tabellfunktion (UDTF) kan du registrera funktioner som returnerar tabeller i stället för skalära värden. Till skillnad från skalärfunktioner som returnerar ett enda resultatvärde från varje anrop anropas varje UDTF i sql-instruktionens FROM
-sats och returnerar en hel tabell som utdata.
Varje UDTF-anrop kan acceptera noll eller fler argument. Dessa argument kan vara skalära uttryck eller tabellargument som representerar hela indatatabeller.
Grundläggande UDTF-syntax
Apache Spark implementerar Python UDTFs som Python-klasser med en obligatorisk eval
-metod som använder yield
för att generera utdatarader.
Om du vill använda klassen som UDTF måste du importera funktionen PySpark udtf
. Databricks rekommenderar att du använder den här funktionen som dekoratör och uttryckligen anger fältnamn och typer med hjälp av alternativet returnType
(såvida inte klassen definierar en analyze
-metod enligt beskrivningen i ett senare avsnitt).
Följande UDTF skapar en tabell med en fast lista med två heltalsargument:
from pyspark.sql.functions import lit, udtf
@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
def eval(self, x: int, y: int):
yield x + y, x - y
GetSumDiff(lit(1), lit(2)).show()
+----+-----+
| sum| diff|
+----+-----+
| 3| -1|
+----+-----+
Registrera en UDTF
UDTF:er är registrerade till det lokala SparkSession
och är isolerade på notebook- eller jobbnivå.
Du kan inte registrera UDF:er som objekt i Unity Catalog, och UDF:er kan inte användas med SQL-lager.
Du kan registrera en UDTF till den aktuella SparkSession
för användning i SQL-frågor med funktionen spark.udtf.register()
. Ange ett namn för SQL-funktionen och Python UDTF-klassen.
spark.udtf.register("get_sum_diff", GetSumDiff)
Anropa en registrerad UDTF
När du har registrerat dig kan du använda UDTF i SQL med antingen det %sql
magiska kommandot eller spark.sql()
funktionen:
spark.udtf.register("get_sum_diff", GetSumDiff)
spark.sql("SELECT * FROM get_sum_diff(1,2);")
%sql
SELECT * FROM get_sum_diff(1,2);
Använda Apache Arrow
Om UDTF tar emot en liten mängd data som indata men matar ut en stor tabell rekommenderar Databricks att du använder Apache Arrow. Du kan aktivera den genom att ange parametern useArrow
när du deklarerar UDTF:
@udtf(returnType="c1: int, c2: int", useArrow=True)
Variabelargumentlistor – *args och **kwargs
Du kan använda Python-*args
eller **kwargs
syntax och implementera logik för att hantera ett ospecificerat antal indatavärden.
I följande exempel returneras samma resultat när du uttryckligen kontrollerar indatalängden och typerna för argumenten:
@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
def eval(self, *args):
assert(len(args) == 2)
assert(isinstance(arg, int) for arg in args)
x = args[0]
y = args[1]
yield x + y, x - y
GetSumDiff(lit(1), lit(2)).show()
Här är samma exempel, men med nyckelordsargument:
@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
def eval(self, **kwargs):
x = kwargs["x"]
y = kwargs["y"]
yield x + y, x - y
GetSumDiff(x=lit(1), y=lit(2)).show()
Definiera ett statiskt schema vid registreringen
UDTF returnerar rader med ett utdataschema som består av en ordnad sekvens med kolumnnamn och typer. Om UDTF-schemat alltid ska vara detsamma för alla frågor kan du ange ett statiskt, fast schema efter @udtf
dekoratör. Det måste antingen vara en StructType
:
StructType().add("c1", StringType())
Eller en DDL-sträng som representerar en structtyp:
c1: string
Beräkna ett dynamiskt schema vid funktionsanropstid
UDF:er kan också beräkna utdataschemat programmatiskt för varje anrop beroende på värdena för indataargumenten. För att göra detta definierar du en statisk metod som kallas analyze
som accepterar noll eller fler parametrar som motsvarar argumenten som tillhandahålls till det specifika UDTF-anropet.
Varje argument i metoden analyze
är en instans av klassen AnalyzeArgument
som innehåller följande fält:
AnalyzeArgument klassfält |
Beskrivning |
---|---|
dataType |
Typen av indataargument som en DataType . För argument för indatatabeller är detta en StructType som representerar tabellens kolumner. |
value |
Värdet för indataargumentet som en Optional[Any] . Det här är None för tabellargument eller literalskalära argument som inte är konstanta. |
isTable |
Om indataargumentet är en tabell som BooleanType . |
isConstantExpression |
Om indataargumentet är ett konstant vikbart uttryck som en BooleanType . |
Metoden analyze
returnerar en instans av klassen AnalyzeResult
, som innehåller resultattabellens schema som ett StructType
plus några valfria fält. Om UDTF accepterar ett argument i indatatabellen kan AnalyzeResult
också inkludera ett begärt sätt att partitionera och beställa raderna i indatatabellen över flera UDTF-anrop, enligt beskrivningen senare.
AnalyzeResult klassfält |
Beskrivning |
---|---|
schema |
Schemat för resultattabellen som en StructType . |
withSinglePartition |
Om alla indatarader ska skickas till samma UDTF-klassinstans som en BooleanType . |
partitionBy |
Om värdet inte är tomt används alla rader med varje unik kombination av värden för partitioneringsuttrycken av en separat instans av UDTF-klassen. |
orderBy |
Om värdet inte är tomt anger detta en ordning på rader inom varje partition. |
select |
Om värdet inte är tomt är detta en sekvens med uttryck som UDTF anger för Catalyst att utvärdera mot kolumnerna i indata TABLE argumentet. UDTF tar emot ett indataattribut för varje namn i listan i den ordning de visas. |
Det här analyze
exemplet returnerar en utdatakolumn för varje ord i argumentet för indatasträngen.
@udtf
class MyUDTF:
@staticmethod
def analyze(text: AnalyzeArgument) -> AnalyzeResult:
schema = StructType()
for index, word in enumerate(sorted(list(set(text.value.split(" "))))):
schema = schema.add(f"word_{index}", IntegerType())
return AnalyzeResult(schema=schema)
def eval(self, text: str):
counts = {}
for word in text.split(" "):
if word not in counts:
counts[word] = 0
counts[word] += 1
result = []
for word in sorted(list(set(text.split(" ")))):
result.append(counts[word])
yield result
['word_0', 'word_1']
Vidarebefordra status till framtida eval
-anrop
Metoden analyze
kan fungera som en praktisk plats för att utföra initiering och sedan vidarebefordra resultatet till framtida eval
metodanrop för samma UDTF-anrop.
För att göra det skapar du en underklass av AnalyzeResult
och returnerar en instans av underklassen från metoden analyze
.
Lägg sedan till ytterligare ett argument i metoden __init__
för att acceptera den instansen.
Det här analyze
exemplet returnerar ett konstant utdataschema, men lägger till anpassad information i resultatmetadata som ska användas av framtida __init__
-metodanrop:
@dataclass
class AnalyzeResultWithBuffer(AnalyzeResult):
buffer: str = ""
@udtf
class TestUDTF:
def __init__(self, analyze_result=None):
self._total = 0
if analyze_result is not None:
self._buffer = analyze_result.buffer
else:
self._buffer = ""
@staticmethod
def analyze(argument, _) -> AnalyzeResult:
if (
argument.value is None
or argument.isTable
or not isinstance(argument.value, str)
or len(argument.value) == 0
):
raise Exception("The first argument must be a non-empty string")
assert argument.dataType == StringType()
assert not argument.isTable
return AnalyzeResultWithBuffer(
schema=StructType()
.add("total", IntegerType())
.add("buffer", StringType()),
withSinglePartition=True,
buffer=argument.value,
)
def eval(self, argument, row: Row):
self._total += 1
def terminate(self):
yield self._total, self._buffer
self.spark.udtf.register("test_udtf", TestUDTF)
spark.sql(
"""
WITH t AS (
SELECT id FROM range(1, 21)
)
SELECT total, buffer
FROM test_udtf("abc", TABLE(t))
"""
).show()
+-------+-------+
| count | buffer|
+-------+-------+
| 20 | "abc"|
+-------+-------+
Ge utdatarader
Metoden eval
körs en gång för varje rad i argumentet för indatatabellen (eller bara en gång om inget tabellargument anges), följt av ett anrop av metoden terminate
i slutet. Antingen matar metoden ut noll eller fler rader som överensstämmer med resultatschemat genom att ge tupplar, listor eller pyspark.sql.Row
objekt.
Det här exemplet returnerar en rad genom att ange en tupl med tre element.
def eval(self, x, y, z):
yield (x, y, z)
Du kan också utelämna parenteserna:
def eval(self, x, y, z):
yield x, y, z
Lägg till ett avslutande kommatecken för att returnera en rad med endast en kolumn:
def eval(self, x, y, z):
yield x,
Du kan också generera ett pyspark.sql.Row
-objekt.
def eval(self, x, y, z)
from pyspark.sql.types import Row
yield Row(x, y, z)
Det här exemplet ger utdatarader från metoden terminate
med hjälp av en Python-lista. Du kan lagra tillståndet i klassen från tidigare steg i UDTF-utvärderingen för det här ändamålet.
def terminate(self):
yield [self.x, self.y, self.z]
Skicka skalära argument till en UDTF
Du kan skicka skalära argument till en UDTF som konstanta uttryck som består av literalvärden eller funktioner baserat på dem. Till exempel:
SELECT * FROM udtf(42, group => upper("finance_department"));
Skicka tabellargument till en UDTF
Python-UDF:er kan acceptera en indatatabell som ett argument utöver skalära indataargument. En enskild UDTF kan också acceptera ett tabellargument och flera skalära argument.
Sedan kan alla SQL-frågor tillhandahålla en indatatabell med hjälp av nyckelordet TABLE
följt av parenteser som omger en lämplig tabellidentifierare, till exempel TABLE(t)
. Du kan också skicka en tabellunderfråga, till exempel TABLE(SELECT a, b, c FROM t)
eller TABLE(SELECT t1.a, t2.b FROM t1 INNER JOIN t2 USING (key))
.
Argumentet för indatatabellen representeras sedan som ett pyspark.sql.Row
argument för metoden eval
, med ett anrop till metoden eval
för varje rad i indatatabellen. Du kan använda standardkommentarer för PySpark-kolumnfält för att interagera med kolumner i varje rad. I följande exempel visas explicit import av PySpark-Row
typ och sedan filtrering av den skickade tabellen i fältet id
:
from pyspark.sql.functions import udtf
from pyspark.sql.types import Row
@udtf(returnType="id: int")
class FilterUDTF:
def eval(self, row: Row):
if row["id"] > 5:
yield row["id"],
spark.udtf.register("filter_udtf", FilterUDTF)
Om du vill köra frågor mot funktionen använder du nyckelordet TABLE
SQL:
SELECT * FROM filter_udtf(TABLE(SELECT * FROM range(10)));
+---+
| id|
+---+
| 6|
| 7|
| 8|
| 9|
+---+
Ange en partitionering av indatarader för funktionsanrop
När du anropar en UDTF med ett tabellargument kan alla SQL-frågor partitionera indatatabellen över flera UDTF-anrop baserat på värdena för en eller flera indatatabellkolumner.
Om du vill ange en partition använder du PARTITION BY
-satsen i funktionsanropet efter argumentet TABLE
.
Detta garanterar att alla indatarader med varje unik kombination av värden i partitioneringskolumnerna hanteras av exakt en instans av UDTF-klassen.
Observera att förutom enkla kolumnreferenser accepterar PARTITION BY
-satsen även godtyckliga uttryck baserat på indatatabellkolumner. Du kan till exempel ange LENGTH
för en sträng, extrahera en månad från ett datum eller sammanfoga två värden.
Det är också möjligt att ange WITH SINGLE PARTITION
istället för PARTITION BY
för att begära endast en partition där alla indatarader måste konsumeras av exakt en instans av UDTF-klassen.
Inom varje partition kan du ange en obligatorisk ordning på indataraderna eftersom UDTF:s eval
-metod använder dem. Det gör du genom att ange en ORDER BY
-sats efter satsen PARTITION BY
eller WITH SINGLE PARTITION
som beskrivs ovan.
Tänk till exempel på följande UDTF:
from pyspark.sql.functions import udtf
from pyspark.sql.types import Row
@udtf(returnType="a: string, b: int")
class FilterUDTF:
def __init__(self):
self.key = ""
self.max = 0
def eval(self, row: Row):
self.key = row["a"]
self.max = max(self.max, row["b"])
def terminate(self):
yield self.key, self.max
spark.udtf.register("filter_udtf", FilterUDTF)
Du kan ange partitioneringsalternativ när du anropar UDTF över indatatabellen på flera olika sätt:
-- Create an input table with some example values.
DROP TABLE IF EXISTS values_table;
CREATE TABLE values_table (a STRING, b INT);
INSERT INTO values_table VALUES ('abc', 2), ('abc', 4), ('def', 6), ('def', 8)";
SELECT * FROM values_table;
+-------+----+
| a | b |
+-------+----+
| "abc" | 2 |
| "abc" | 4 |
| "def" | 6 |
| "def" | 8 |
+-------+----+
-- Query the UDTF with the input table as an argument and a directive to partition the input
-- rows such that all rows with each unique value in the `a` column are processed by the same
-- instance of the UDTF class. Within each partition, the rows are ordered by the `b` column.
SELECT * FROM filter_udtf(TABLE(values_table) PARTITION BY a ORDER BY b) ORDER BY 1;
+-------+----+
| a | b |
+-------+----+
| "abc" | 4 |
| "def" | 8 |
+-------+----+
-- Query the UDTF with the input table as an argument and a directive to partition the input
-- rows such that all rows with each unique result of evaluating the "LENGTH(a)" expression are
-- processed by the same instance of the UDTF class. Within each partition, the rows are ordered
-- by the `b` column.
SELECT * FROM filter_udtf(TABLE(values_table) PARTITION BY LENGTH(a) ORDER BY b) ORDER BY 1;
+-------+---+
| a | b |
+-------+---+
| "def" | 8 |
+-------+---+
-- Query the UDTF with the input table as an argument and a directive to consider all the input
-- rows in one single partition such that exactly one instance of the UDTF class consumes all of
-- the input rows. Within each partition, the rows are ordered by the `b` column.
SELECT * FROM filter_udtf(TABLE(values_table) WITH SINGLE PARTITION ORDER BY b) ORDER BY 1;
+-------+----+
| a | b |
+-------+----+
| "def" | 8 |
+-------+----+
Ange en partitionering av indataraderna från metoden analyze
Observera att för vart och ett av ovanstående sätt att partitionera indatatabellen när du anropar UDF:er i SQL-frågor finns det ett motsvarande sätt för UDTF:s analyze
-metod att ange samma partitioneringsmetod automatiskt i stället.
- I stället för att anropa en UDTF som
SELECT * FROM udtf(TABLE(t) PARTITION BY a)
kan du uppdatera metodenanalyze
för att ange fältetpartitionBy=[PartitioningColumn("a")]
och helt enkelt anropa funktionen med hjälp avSELECT * FROM udtf(TABLE(t))
. - Med samma token kan du i stället för att ange
TABLE(t) WITH SINGLE PARTITION ORDER BY b
i SQL-frågan göraanalyze
ange fältenwithSinglePartition=true
ochorderBy=[OrderingColumn("b")]
och sedan bara skickaTABLE(t)
. - I stället för att skicka
TABLE(SELECT a FROM t)
i SQL-frågan kan du göraanalyze
angeselect=[SelectedColumn("a")]
och sedan bara skickaTABLE(t)
.
I följande exempel returnerar analyze
ett konstant utdataschema, väljer en delmängd av kolumner från indatatabellen och anger att indatatabellen är partitionerad över flera UDTF-anrop baserat på värdena i kolumnen date
:
@staticmethod
def analyze(*args) -> AnalyzeResult:
"""
The input table will be partitioned across several UDTF calls based on the monthly
values of each `date` column. The rows within each partition will arrive ordered by the `date`
column. The UDTF will only receive the `date` and `word` columns from the input table.
"""
from pyspark.sql.functions import (
AnalyzeResult,
OrderingColumn,
PartitioningColumn,
)
assert len(args) == 1, "This function accepts one argument only"
assert args[0].isTable, "Only table arguments are supported"
return AnalyzeResult(
schema=StructType()
.add("month", DateType())
.add('longest_word", IntegerType()),
partitionBy=[
PartitioningColumn("extract(month from date)")],
orderBy=[
OrderingColumn("date")],
select=[
SelectedColumn("date"),
SelectedColumn(
name="length(word),
alias="length_word")])