Door de gebruiker gedefinieerde Python-tabelfuncties (UDF's)
Belangrijk
Deze functie bevindt zich in openbare preview in Databricks Runtime 14.3 LTS en hoger.
Met een door de gebruiker gedefinieerde tabelfunctie (UDTF) kunt u functies registreren die tabellen retourneren in plaats van scalaire waarden. In tegenstelling tot scalaire functies die één resultaatwaarde retourneren uit elke aanroep, wordt elke UDTF aangeroepen in de component van FROM
een SQL-instructie en wordt een hele tabel als uitvoer geretourneerd.
Elke UDTF-aanroep kan nul of meer argumenten accepteren. Deze argumenten kunnen scalaire expressies of tabelargumenten zijn die volledige invoertabellen vertegenwoordigen.
Eenvoudige UDTF-syntaxis
Apache Spark implementeert Python UDFS als Python-klassen met een verplichte eval
methode die wordt gebruikt yield
om uitvoerrijen te verzenden.
Als u uw klasse wilt gebruiken als UDTF, moet u de Functie PySpark udtf
importeren. Databricks raadt aan deze functie als decorator te gebruiken en expliciet veldnamen en -typen op te geven met behulp van de returnType
optie (tenzij de klasse een analyze
methode definieert zoals beschreven in een latere sectie).
Met de volgende UDTF maakt u een tabel met een vaste lijst met twee gehele getallen:
from pyspark.sql.functions import lit, udtf
@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
def eval(self, x: int, y: int):
yield x + y, x - y
GetSumDiff(lit(1), lit(2)).show()
+----+-----+
| sum| diff|
+----+-----+
| 3| -1|
+----+-----+
Een UDTF registreren
UDDF's worden geregistreerd bij de lokale SparkSession
en worden geïsoleerd op notebook- of taakniveau.
U KUNT UDDF's niet registreren als objecten in Unity Catalog en UDDF's kunnen niet worden gebruikt met SQL-warehouses.
U kunt een UDTF registreren bij de huidige SparkSession
voor gebruik in SQL-query's met de functie spark.udtf.register()
. Geef een naam op voor de SQL-functie en de Python UDTF-klasse.
spark.udtf.register("get_sum_diff", GetSumDiff)
Een geregistreerde UDTF aanroepen
Nadat u zich hebt geregistreerd, kunt u de UDTF in SQL gebruiken met behulp van de %sql
magic-opdracht of spark.sql()
functie:
spark.udtf.register("get_sum_diff", GetSumDiff)
spark.sql("SELECT * FROM get_sum_diff(1,2);")
%sql
SELECT * FROM get_sum_diff(1,2);
Apache Arrow gebruiken
Als uw UDTF een kleine hoeveelheid gegevens ontvangt als invoer, maar een grote tabel uitvoert, raadt Databricks aan Apache Arrow te gebruiken. U kunt deze inschakelen door de useArrow
parameter op te geven bij het declareren van de UDTF:
@udtf(returnType="c1: int, c2: int", useArrow=True)
Lijsten met variabelenargumenten - *args en **kwargs
U kunt Python *args
of **kwargs
syntaxis gebruiken en logica implementeren om een niet-opgegeven aantal invoerwaarden te verwerken.
In het volgende voorbeeld wordt hetzelfde resultaat geretourneerd terwijl de invoerlengte en typen voor de argumenten expliciet worden gecontroleerd:
@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
def eval(self, *args):
assert(len(args) == 2)
assert(isinstance(arg, int) for arg in args)
x = args[0]
y = args[1]
yield x + y, x - y
GetSumDiff(lit(1), lit(2)).show()
Dit is hetzelfde voorbeeld, maar het gebruik van trefwoordargumenten:
@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
def eval(self, **kwargs):
x = kwargs["x"]
y = kwargs["y"]
yield x + y, x - y
GetSumDiff(x=lit(1), y=lit(2)).show()
Een statisch schema definiëren tijdens de registratie
De UDTF retourneert rijen met een uitvoerschema dat bestaat uit een geordende reeks kolomnamen en -typen. Als het UDTF-schema altijd hetzelfde moet blijven voor alle query's, kunt u een statisch, vast schema opgeven na de @udtf
decorator. Het moet een StructType
van de volgende zijn:
StructType().add("c1", StringType())
Of een DDL-tekenreeks die een structtype vertegenwoordigt:
c1: string
Een dynamisch schema berekenen tijdens het aanroepen van functies
UDDF's kunnen het uitvoerschema ook programmatisch berekenen voor elke aanroep, afhankelijk van de waarden van de invoerargumenten. Hiervoor definieert u een statische methode analyze
die nul of meer parameters accepteert die overeenkomen met de argumenten die zijn opgegeven voor de specifieke UDTF-aanroep.
Elk argument van de analyze
methode is een exemplaar van de AnalyzeArgument
klasse die de volgende velden bevat:
AnalyzeArgument klasseveld |
Beschrijving |
---|---|
dataType |
Het type van het invoerargument als een DataType . Voor invoertabelargumenten is dit een StructType weergave van de kolommen van de tabel. |
value |
De waarde van het invoerargument als een Optional[Any] . Dit is None voor tabelargumenten of letterlijke scalaire argumenten die niet constant zijn. |
isTable |
Of het invoerargument een tabel is als een BooleanType . |
isConstantExpression |
Of het invoerargument een constante vouwbare expressie is als een BooleanType . |
De analyze
methode retourneert een exemplaar van de AnalyzeResult
klasse, dat het schema van de resultaattabel bevat als plus StructType
enkele optionele velden. Als de UDTF een invoertabelargument accepteert, kan het AnalyzeResult
ook een aangevraagde manier bevatten om de rijen van de invoertabel te partitioneren en te ordenen voor verschillende UDTF-aanroepen, zoals later wordt beschreven.
AnalyzeResult klasseveld |
Beschrijving |
---|---|
schema |
Het schema van de resultaattabel als een StructType . |
withSinglePartition |
Of alle invoerrijen moeten worden verzonden naar hetzelfde UDTF-klasse-exemplaar als een BooleanType . |
partitionBy |
Als deze optie is ingesteld op niet-leeg, worden alle rijen met elke unieke combinatie van waarden van de partitioneringsexpressies gebruikt door een afzonderlijk exemplaar van de UDTF-klasse. |
orderBy |
Als deze optie is ingesteld op niet-leeg, geeft dit een volgorde van rijen binnen elke partitie op. |
select |
Als dit is ingesteld op niet-leeg, is dit een reeks expressies die de UDTF opgeeft voor Katalysator om te evalueren op basis van de kolommen in het argument invoerTABEL. De UDTF ontvangt één invoerkenmerk voor elke naam in de lijst in de volgorde waarin ze worden vermeld. |
In dit analyze
voorbeeld wordt één uitvoerkolom geretourneerd voor elk woord in het argument invoertekenreeks.
@udtf
class MyUDTF:
@staticmethod
def analyze(text: AnalyzeArgument) -> AnalyzeResult:
schema = StructType()
for index, word in enumerate(sorted(list(set(text.value.split(" "))))):
schema = schema.add(f"word_{index}", IntegerType())
return AnalyzeResult(schema=schema)
def eval(self, text: str):
counts = {}
for word in text.split(" "):
if word not in counts:
counts[word] = 0
counts[word] += 1
result = []
for word in sorted(list(set(text.split(" ")))):
result.append(counts[word])
yield result
['word_0', 'word_1']
Status doorschakelen naar toekomstige eval
aanroepen
De analyze
methode kan fungeren als een handige plaats om initialisatie uit te voeren en de resultaten vervolgens door te sturen naar toekomstige eval
methode-aanroepen voor dezelfde UDTF-aanroep.
Hiervoor maakt u een subklasse van AnalyzeResult
en retourneert u een exemplaar van de subklasse van de analyze
methode.
Voeg vervolgens een extra argument toe aan de __init__
methode om dat exemplaar te accepteren.
In dit analyze
voorbeeld wordt een constant uitvoerschema geretourneerd, maar worden aangepaste gegevens toegevoegd in de metagegevens van de resultaten die moeten worden gebruikt door toekomstige __init__
methode-aanroepen:
@dataclass
class AnalyzeResultWithBuffer(AnalyzeResult):
buffer: str = ""
@udtf
class TestUDTF:
def __init__(self, analyze_result=None):
self._total = 0
if analyze_result is not None:
self._buffer = analyze_result.buffer
else:
self._buffer = ""
@staticmethod
def analyze(argument, _) -> AnalyzeResult:
if (
argument.value is None
or argument.isTable
or not isinstance(argument.value, str)
or len(argument.value) == 0
):
raise Exception("The first argument must be a non-empty string")
assert argument.dataType == StringType()
assert not argument.isTable
return AnalyzeResultWithBuffer(
schema=StructType()
.add("total", IntegerType())
.add("buffer", StringType()),
withSinglePartition=True,
buffer=argument.value,
)
def eval(self, argument, row: Row):
self._total += 1
def terminate(self):
yield self._total, self._buffer
self.spark.udtf.register("test_udtf", TestUDTF)
spark.sql(
"""
WITH t AS (
SELECT id FROM range(1, 21)
)
SELECT total, buffer
FROM test_udtf("abc", TABLE(t))
"""
).show()
+-------+-------+
| count | buffer|
+-------+-------+
| 20 | "abc"|
+-------+-------+
Uitvoerrijen opleveren
De eval
methode wordt eenmaal uitgevoerd voor elke rij van het argument invoertabel (of slechts één keer als er geen tabelargument wordt opgegeven), gevolgd door één aanroep van de terminate
methode aan het einde. De methode voert nul of meer rijen uit die voldoen aan het resultaatschema door tuples, lijsten of pyspark.sql.Row
objecten te genereren.
In dit voorbeeld wordt een rij geretourneerd door een tuple van drie elementen op te geven:
def eval(self, x, y, z):
yield (x, y, z)
U kunt ook de haakjes weglaten:
def eval(self, x, y, z):
yield x, y, z
Voeg een volgkomma toe om een rij met slechts één kolom te retourneren:
def eval(self, x, y, z):
yield x,
U kunt ook een pyspark.sql.Row
object opleveren.
def eval(self, x, y, z)
from pyspark.sql.types import Row
yield Row(x, y, z)
In dit voorbeeld worden uitvoerrijen van de terminate
methode geretourneerd met behulp van een Python-lijst. U kunt de status in de klasse opslaan uit eerdere stappen in de UDTF-evaluatie voor dit doel.
def terminate(self):
yield [self.x, self.y, self.z]
Scalaire argumenten doorgeven aan een UDTF
U kunt scalaire argumenten doorgeven aan een UDTF als constante expressies die letterlijke waarden of functies bevatten op basis hiervan. Voorbeeld:
SELECT * FROM udtf(42, group => upper("finance_department"));
Tabelargumenten doorgeven aan een UDTF
Python UDFS kunnen naast scalaire invoerargumenten een invoertabel als argument accepteren. Eén UDTF kan ook een tabelargument en meerdere scalaire argumenten accepteren.
Vervolgens kan elke SQL-query een invoertabel opgeven met behulp van het TABLE
trefwoord gevolgd door haakjes rond een geschikte tabel-id, zoals TABLE(t)
. U kunt ook een tabelsubquery doorgeven, zoals TABLE(SELECT a, b, c FROM t)
of TABLE(SELECT t1.a, t2.b FROM t1 INNER JOIN t2 USING (key))
.
Het argument invoertabel wordt vervolgens weergegeven als een pyspark.sql.Row
argument voor de eval
methode, met één aanroep naar de eval
methode voor elke rij in de invoertabel. U kunt standaardaantekeningen voor pySpark-kolomvelden gebruiken om te communiceren met kolommen in elke rij. In het volgende voorbeeld ziet u hoe u het PySpark-type Row
expliciet importeert en vervolgens de doorgegeven tabel in het id
veld filtert:
from pyspark.sql.functions import udtf
from pyspark.sql.types import Row
@udtf(returnType="id: int")
class FilterUDTF:
def eval(self, row: Row):
if row["id"] > 5:
yield row["id"],
spark.udtf.register("filter_udtf", FilterUDTF)
Als u een query wilt uitvoeren op de functie, gebruikt u het TABLE
trefwoord SQL:
SELECT * FROM filter_udtf(TABLE(SELECT * FROM range(10)));
+---+
| id|
+---+
| 6|
| 7|
| 8|
| 9|
+---+
Een partitionering van de invoerrijen van functie-aanroepen opgeven
Wanneer u een UDTF aanroept met een tabelargument, kan elke SQL-query de invoertabel partitioneren in verschillende UDTF-aanroepen op basis van de waarden van een of meer invoertabelkolommen.
Als u een partitie wilt opgeven, gebruikt u de PARTITION BY
component in de functieaanroep na het TABLE
argument.
Dit garandeert dat alle invoerrijen met elke unieke combinatie van waarden van de partitioneringskolommen worden verbruikt door precies één exemplaar van de UDTF-klasse.
Naast eenvoudige kolomverwijzingen accepteert de PARTITION BY
component ook willekeurige expressies op basis van invoertabelkolommen. U kunt bijvoorbeeld de LENGTH
tekenreeks opgeven, een maand uit een datum extraheren of twee waarden samenvoegen.
Het is ook mogelijk om op te geven WITH SINGLE PARTITION
in plaats van PARTITION BY
slechts één partitie aan te vragen waarbij alle invoerrijen moeten worden gebruikt door precies één exemplaar van de UDTF-klasse.
Binnen elke partitie kunt u desgewenst een vereiste volgorde van de invoerrijen opgeven, omdat de methode UDTF eval
deze gebruikt. Als u dit wilt doen, geeft u een ORDER BY
component op na de PARTITION BY
hierboven beschreven of WITH SINGLE PARTITION
component.
Denk bijvoorbeeld aan de volgende UDTF:
from pyspark.sql.functions import udtf
from pyspark.sql.types import Row
@udtf(returnType="a: string, b: int")
class FilterUDTF:
def __init__(self):
self.key = ""
self.max = 0
def eval(self, row: Row):
self.key = row["a"]
self.max = max(self.max, row["b"])
def terminate(self):
yield self.key, self.max
spark.udtf.register("filter_udtf", FilterUDTF)
U kunt partitioneringsopties opgeven bij het aanroepen van de UDTF via de invoertabel op meerdere manieren:
-- Create an input table with some example values.
DROP TABLE IF EXISTS values_table;
CREATE TABLE values_table (a STRING, b INT);
INSERT INTO values_table VALUES ('abc', 2), ('abc', 4), ('def', 6), ('def', 8)";
SELECT * FROM values_table;
+-------+----+
| a | b |
+-------+----+
| "abc" | 2 |
| "abc" | 4 |
| "def" | 6 |
| "def" | 8 |
+-------+----+
-- Query the UDTF with the input table as an argument and a directive to partition the input
-- rows such that all rows with each unique value in the `a` column are processed by the same
-- instance of the UDTF class. Within each partition, the rows are ordered by the `b` column.
SELECT * FROM filter_udtf(TABLE(values_table) PARTITION BY a ORDER BY b) ORDER BY 1;
+-------+----+
| a | b |
+-------+----+
| "abc" | 4 |
| "def" | 8 |
+-------+----+
-- Query the UDTF with the input table as an argument and a directive to partition the input
-- rows such that all rows with each unique result of evaluating the "LENGTH(a)" expression are
-- processed by the same instance of the UDTF class. Within each partition, the rows are ordered
-- by the `b` column.
SELECT * FROM filter_udtf(TABLE(values_table) PARTITION BY LENGTH(a) ORDER BY b) ORDER BY 1;
+-------+---+
| a | b |
+-------+---+
| "def" | 8 |
+-------+---+
-- Query the UDTF with the input table as an argument and a directive to consider all the input
-- rows in one single partition such that exactly one instance of the UDTF class consumes all of
-- the input rows. Within each partition, the rows are ordered by the `b` column.
SELECT * FROM filter_udtf(TABLE(values_table) WITH SINGLE PARTITION ORDER BY b) ORDER BY 1;
+-------+----+
| a | b |
+-------+----+
| "def" | 8 |
+-------+----+
Een partitionering van de invoerrijen van de analyze
methode opgeven
Houd er rekening mee dat voor elk van de bovenstaande manieren om de invoertabel te partitioneren bij het aanroepen van UDTF's in SQL-query's, er een overeenkomstige manier is voor de methode van de UDTF analyze
om in plaats daarvan automatisch dezelfde partitioneringsmethode op te geven.
- In plaats van een UDTF aan te roepen,
SELECT * FROM udtf(TABLE(t) PARTITION BY a)
kunt u deanalyze
methode bijwerken om het veldpartitionBy=[PartitioningColumn("a")]
in te stellen en gewoon de functie aan te roepen met behulp vanSELECT * FROM udtf(TABLE(t))
. - Met hetzelfde token, in plaats van op te geven
TABLE(t) WITH SINGLE PARTITION ORDER BY b
in de SQL-query, kuntanalyze
u de veldenwithSinglePartition=true
instellen enorderBy=[OrderingColumn("b")]
vervolgens gewoon doorgevenTABLE(t)
. - In plaats van de SQL-query door te geven
TABLE(SELECT a FROM t)
, kuntanalyze
u instellenselect=[SelectedColumn("a")]
en vervolgens gewoon doorgevenTABLE(t)
.
In het volgende voorbeeld analyze
wordt een constant uitvoerschema geretourneerd, wordt een subset van kolommen uit de invoertabel geselecteerd en wordt aangegeven dat de invoertabel wordt gepartitioneerd voor verschillende UDTF-aanroepen op basis van de waarden van de date
kolom:
@staticmethod
def analyze(*args) -> AnalyzeResult:
"""
The input table will be partitioned across several UDTF calls based on the monthly
values of each `date` column. The rows within each partition will arrive ordered by the `date`
column. The UDTF will only receive the `date` and `word` columns from the input table.
"""
from pyspark.sql.functions import (
AnalyzeResult,
OrderingColumn,
PartitioningColumn,
)
assert len(args) == 1, "This function accepts one argument only"
assert args[0].isTable, "Only table arguments are supported"
return AnalyzeResult(
schema=StructType()
.add("month", DateType())
.add('longest_word", IntegerType()),
partitionBy=[
PartitioningColumn("extract(month from date)")],
orderBy=[
OrderingColumn("date")],
select=[
SelectedColumn("date"),
SelectedColumn(
name="length(word),
alias="length_word")])