Door de gebruiker gedefinieerde Python-table-functies (UDDF's)
Belangrijk
Deze functie bevindt zich in Publieke Preview in Databricks Runtime 14.3 LTS en hoger.
Met een door de gebruiker gedefinieerde table-functie (UDTF) kunt u functies registreren die tables retourneren in plaats van scalaire values. In tegenstelling tot scalaire functies die één resultaatwaarde retourneren uit elke aanroep, wordt elke UDTF aangeroepen in de FROM
-component van een SQL-instructie en wordt een volledige table als uitvoer geretourneerd.
Elke UDTF-aanroep kan nul of meer argumenten accepteren. Deze argumenten kunnen scalaire expressies zijn of table argumenten die volledige invoer tablesvertegenwoordigen.
Eenvoudige UDTF-syntaxis
Apache Spark implementeert Python UDFS als Python-klassen met een verplichte eval
methode die gebruikmaakt van yield
om uitvoerrijen te verzenden.
Als u uw klasse als UDTF wilt gebruiken, moet u de functie PySpark udtf
importeren. Databricks raadt aan deze functie als decorator te gebruiken en expliciet veldnamen en -typen op te geven met behulp van de optie returnType
(tenzij de klasse een analyze
methode definieert zoals beschreven in een latere sectie).
Met de volgende UDTF maakt u een table met behulp van een vaste list van twee gehele getallen:
from pyspark.sql.functions import lit, udtf
@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
def eval(self, x: int, y: int):
yield x + y, x - y
GetSumDiff(lit(1), lit(2)).show()
+----+-----+
| sum| diff|
+----+-----+
| 3| -1|
+----+-----+
Een UDTF registreren
UDTF's worden geregistreerd bij lokale SparkSession
en worden geïsoleerd op notebook- of taakniveau.
U kunt UDDF's niet registreren als objecten in Unity Catalogen UDDF's kunnen niet worden gebruikt met SQL-warehouses.
U kunt een UDTF registreren bij de huidige SparkSession
voor gebruik in SQL-query's met de functie spark.udtf.register()
. Geef een naam op voor de SQL-functie en de Python UDTF-klasse.
spark.udtf.register("get_sum_diff", GetSumDiff)
Een geregistreerde UDTF aanroepen
Nadat u zich hebt geregistreerd, kunt u de UDTF in SQL gebruiken met behulp van de %sql
magic-opdracht of spark.sql()
functie:
spark.udtf.register("get_sum_diff", GetSumDiff)
spark.sql("SELECT * FROM get_sum_diff(1,2);")
%sql
SELECT * FROM get_sum_diff(1,2);
Apache Arrow gebruiken
Als uw UDTF een kleine hoeveelheid gegevens ontvangt als invoer, maar een grote tableuitvoert, raadt Databricks aan Apache Arrow te gebruiken. U kunt deze inschakelen door de parameter useArrow
op te geven bij het declareren van de UDTF:
@udtf(returnType="c1: int, c2: int", useArrow=True)
Lijsten met variabelenargumenten - *args en **kwargs
U kunt Python *args
of **kwargs
syntaxis gebruiken en logica implementeren om een niet-opgegeven aantal invoer-valueste verwerken.
In het volgende voorbeeld wordt hetzelfde resultaat geretourneerd terwijl de invoerlengte en typen voor de argumenten expliciet worden gecontroleerd:
@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
def eval(self, *args):
assert(len(args) == 2)
assert(isinstance(arg, int) for arg in args)
x = args[0]
y = args[1]
yield x + y, x - y
GetSumDiff(lit(1), lit(2)).show()
Dit is hetzelfde voorbeeld, maar het gebruik van trefwoordargumenten:
@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
def eval(self, **kwargs):
x = kwargs["x"]
y = kwargs["y"]
yield x + y, x - y
GetSumDiff(x=lit(1), y=lit(2)).show()
Een statische schema definiëren op het moment van registratie
De UDTF retourneert rijen met een uitvoer schema bestaande uit een geordende reeks column namen en typen. Als de UDTF-schema altijd hetzelfde moet blijven voor alle query's, kunt u een statische, vaste schema opgeven na de @udtf
decorator. Het moet een StructType
zijn:
StructType().add("c1", StringType())
Of een DDL-tekenreeks die een structtype vertegenwoordigt:
c1: string
Bereken een dynamische schema tijdens het aanroepen van een functie
UDDF's kunnen ook de uitvoer schema programmatisch berekenen voor elke aanroep, afhankelijk van de values van de invoerargumenten. Hiervoor definieert u een statische methode met de naam analyze
die nul of meer parameters accepteert die overeenkomt met de argumenten die zijn opgegeven voor de specifieke UDTF-aanroep.
Elk argument van de methode analyze
is een exemplaar van de AnalyzeArgument
klasse die de volgende velden bevat:
AnalyzeArgument klasseattribuut |
Beschrijving |
---|---|
dataType |
Het type van het invoerargument als een DataType . Voor invoer-table argumenten is dit een StructType die de columnsvan de tablevertegenwoordigt. |
value |
De waarde van het invoerargument als een Optional[Any] . Dit is None voor table argumenten of letterlijke scalaire argumenten die niet constant zijn. |
isTable |
Of het invoerargument een table is als een BooleanType . |
isConstantExpression |
Of het invoerargument een constante vouwbare expressie is als een BooleanType . |
De methode analyze
geeft een instantie van de klasse AnalyzeResult
terug, die het resultaat table's schema als een StructType
plus enkele optionele velden bevat. Als de UDTF een invoer table-argument accepteert, kan de AnalyzeResult
ook een aangevraagde methode bevatten om de rijen van de invoer table te partition en te ordenen gedurende verschillende UDTF-aanroepen, zoals later wordt beschreven.
AnalyzeResult klasseattribuut |
Beschrijving |
---|---|
schema |
De schema van het resultaat table als een StructType . |
withSinglePartition |
Of alle invoerrijen moeten worden verzonden naar hetzelfde UDTF-klasse-exemplaar als een BooleanType . |
partitionBy |
Als set niet leeg is, worden alle rijen met elke unieke combinatie van de partities values verwerkt door een afzonderlijk voorbeeld van de UDTF-klasse. |
orderBy |
Als set niet leeg is, geeft dit een volgorde van rijen binnen elke partitionaan. |
select |
Als set niet leeg is, is dit een reeks uitdrukkingen die de UDTF voor Catalyst specificeert om te evalueren tegen de columns in het invoerargument TABLE. De UDTF ontvangt één invoerkenmerk voor elke naam in de list in de volgorde waarin ze worden vermeld. |
Dit analyze
voorbeeld retourneert één uitvoer column voor elk woord in de invoertekenreeks argument.
@udtf
class MyUDTF:
@staticmethod
def analyze(text: AnalyzeArgument) -> AnalyzeResult:
schema = StructType()
for index, word in enumerate(sorted(list(set(text.value.split(" "))))):
schema = schema.add(f"word_{index}", IntegerType())
return AnalyzeResult(schema=schema)
def eval(self, text: str):
counts = {}
for word in text.split(" "):
if word not in counts:
counts[word] = 0
counts[word] += 1
result = []
for word in sorted(list(set(text.split(" ")))):
result.append(counts[word])
yield result
['word_0', 'word_1']
Status doorsturen naar toekomstige eval
-aanroepen
De methode analyze
kan fungeren als een handige plaats om initialisatie uit te voeren en de resultaten vervolgens door te sturen naar toekomstige eval
methode-aanroepen voor dezelfde UDTF-aanroep.
Hiervoor maakt u een subklasse van AnalyzeResult
en retourneert u een exemplaar van de subklasse van de analyze
methode.
Voeg vervolgens een extra argument toe aan de methode __init__
om dat exemplaar te accepteren.
Dit analyze
voorbeeld retourneert een constante uitvoer schema, maar voegt aangepaste gegevens toe in de metagegevens van het resultaat die moeten worden gebruikt door toekomstige __init__
methode-aanroepen:
@dataclass
class AnalyzeResultWithBuffer(AnalyzeResult):
buffer: str = ""
@udtf
class TestUDTF:
def __init__(self, analyze_result=None):
self._total = 0
if analyze_result is not None:
self._buffer = analyze_result.buffer
else:
self._buffer = ""
@staticmethod
def analyze(argument, _) -> AnalyzeResult:
if (
argument.value is None
or argument.isTable
or not isinstance(argument.value, str)
or len(argument.value) == 0
):
raise Exception("The first argument must be a non-empty string")
assert argument.dataType == StringType()
assert not argument.isTable
return AnalyzeResultWithBuffer(
schema=StructType()
.add("total", IntegerType())
.add("buffer", StringType()),
withSinglePartition=True,
buffer=argument.value,
)
def eval(self, argument, row: Row):
self._total += 1
def terminate(self):
yield self._total, self._buffer
self.spark.udtf.register("test_udtf", TestUDTF)
spark.sql(
"""
WITH t AS (
SELECT id FROM range(1, 21)
)
SELECT total, buffer
FROM test_udtf("abc", TABLE(t))
"""
).show()
+-------+-------+
| count | buffer|
+-------+-------+
| 20 | "abc"|
+-------+-------+
Uitvoerrijen genereren
De methode eval
wordt eenmaal uitgevoerd voor elke rij van het argument invoer table (of slechts één keer als er geen table argument wordt opgegeven), gevolgd door één aanroep van de methode terminate
aan het einde. De methode voert nul of meer rijen uit die voldoen aan het resultaat schema door tuples, lijsten of pyspark.sql.Row
objecten te genereren.
In dit voorbeeld wordt een rij geretourneerd door een tuple van drie elementen op te geven:
def eval(self, x, y, z):
yield (x, y, z)
U kunt ook de haakjes weglaten:
def eval(self, x, y, z):
yield x, y, z
Voeg een volgkomma toe om een rij met slechts één columnte retourneren:
def eval(self, x, y, z):
yield x,
U kunt ook een pyspark.sql.Row
object opleveren.
def eval(self, x, y, z)
from pyspark.sql.types import Row
yield Row(x, y, z)
In dit voorbeeld worden uitvoerrijen van de terminate
-methode gegenereerd met Python list. U kunt de status in de klasse opslaan uit eerdere stappen in de UDTF-evaluatie voor dit doel.
def terminate(self):
yield [self.x, self.y, self.z]
Scalaire argumenten doorgeven aan een UDTF
U kunt scalaire argumenten doorgeven aan een UDTF als constante expressies die bestaan uit letterlijke values of functies op basis hiervan. Bijvoorbeeld:
SELECT * FROM udtf(42, group => upper("finance_department"));
Argumenten table doorgeven aan een UDTF
Python UDFS kunnen naast scalaire invoerargumenten een invoer-table accepteren als argument. Eén UDTF kan ook een table argument en meerdere scalaire argumenten accepteren.
Vervolgens kan elke SQL-query een invoer table met behulp van het TABLE
trefwoord gevolgd door haakjes rond een geschikte tableidentifier, zoals TABLE(t)
. U kunt ook een table subquery doorgeven, zoals TABLE(SELECT a, b, c FROM t)
of TABLE(SELECT t1.a, t2.b FROM t1 INNER JOIN t2 USING (key))
.
Het argument invoer table wordt vervolgens weergegeven als een pyspark.sql.Row
argument voor de methode eval
, met één aanroep naar de eval
methode voor elke rij in de invoer table. U kunt standaardaantekeningen voor PySpark column gebruiken om in elke rij met columns te communiceren. In het volgende voorbeeld ziet u hoe u het type PySpark-Row
expliciet importeert en vervolgens de doorgegeven table in het veld id
filtert:
from pyspark.sql.functions import udtf
from pyspark.sql.types import Row
@udtf(returnType="id: int")
class FilterUDTF:
def eval(self, row: Row):
if row["id"] > 5:
yield row["id"],
spark.udtf.register("filter_udtf", FilterUDTF)
Als u een query wilt uitvoeren op de functie, gebruikt u het TABLE
SQL-trefwoord:
SELECT * FROM filter_udtf(TABLE(SELECT * FROM range(10)));
+---+
| id|
+---+
| 6|
| 7|
| 8|
| 9|
+---+
Specificeer een partitionering van de invoerrijen uit functie-aanroepen
Wanneer u een UDTF aanroept met een table-argument, kan elke SQL-query de invoer-table over meerdere UDTF-aanroepen partition op basis van de values van een of meer invoer-tablecolumns.
Als u een partitionwilt opgeven, gebruik de PARTITION BY
clausule in de functieaanroep na het argument TABLE
.
Dit garandeert dat alle invoerrijen met elke unieke combinatie van values van de partitie-columnsget verbruikt worden door precies één exemplaar van de UDTF-klasse.
Naast eenvoudige column verwijzingen accepteert de PARTITION BY
component ook willekeurige expressies op basis van invoer tablecolumns. U kunt bijvoorbeeld de LENGTH
van een tekenreeks opgeven, een maand uit een datum extraheren of twee valuessamenvoegen.
Het is ook mogelijk om WITH SINGLE PARTITION
op te geven in plaats van PARTITION BY
om slechts één partition op te vragen waarbij alle invoerrijen moeten worden gebruikt door precies één exemplaar van de UDTF-klasse.
Binnen elke partitionkunt u desgewenst een vereiste volgorde van de invoerrijen opgeven, omdat de eval
methode van de UDTF deze gebruikt. Geef hiervoor een ORDER BY
component op na de hierboven beschreven PARTITION BY
of WITH SINGLE PARTITION
component.
Denk bijvoorbeeld aan de volgende UDTF:
from pyspark.sql.functions import udtf
from pyspark.sql.types import Row
@udtf(returnType="a: string, b: int")
class FilterUDTF:
def __init__(self):
self.key = ""
self.max = 0
def eval(self, row: Row):
self.key = row["a"]
self.max = max(self.max, row["b"])
def terminate(self):
yield self.key, self.max
spark.udtf.register("filter_udtf", FilterUDTF)
U kunt partitioneringsopties opgeven bij het aanroepen van de UDTF via de invoer table op meerdere manieren:
-- Create an input table with some example values.
DROP TABLE IF EXISTS values_table;
CREATE TABLE values_table (a STRING, b INT);
INSERT INTO values_table VALUES ('abc', 2), ('abc', 4), ('def', 6), ('def', 8)";
SELECT * FROM values_table;
+-------+----+
| a | b |
+-------+----+
| "abc" | 2 |
| "abc" | 4 |
| "def" | 6 |
| "def" | 8 |
+-------+----+
-- Query the UDTF with the input table as an argument and a directive to partition the input
-- rows such that all rows with each unique value in the `a` column are processed by the same
-- instance of the UDTF class. Within each partition, the rows are ordered by the `b` column.
SELECT * FROM filter_udtf(TABLE(values_table) PARTITION BY a ORDER BY b) ORDER BY 1;
+-------+----+
| a | b |
+-------+----+
| "abc" | 4 |
| "def" | 8 |
+-------+----+
-- Query the UDTF with the input table as an argument and a directive to partition the input
-- rows such that all rows with each unique result of evaluating the "LENGTH(a)" expression are
-- processed by the same instance of the UDTF class. Within each partition, the rows are ordered
-- by the `b` column.
SELECT * FROM filter_udtf(TABLE(values_table) PARTITION BY LENGTH(a) ORDER BY b) ORDER BY 1;
+-------+---+
| a | b |
+-------+---+
| "def" | 8 |
+-------+---+
-- Query the UDTF with the input table as an argument and a directive to consider all the input
-- rows in one single partition such that exactly one instance of the UDTF class consumes all of
-- the input rows. Within each partition, the rows are ordered by the `b` column.
SELECT * FROM filter_udtf(TABLE(values_table) WITH SINGLE PARTITION ORDER BY b) ORDER BY 1;
+-------+----+
| a | b |
+-------+----+
| "def" | 8 |
+-------+----+
Een partitionering van de invoerrijen van de analyze
-methode opgeven
Houd er rekening mee dat voor elk van de bovenstaande manieren om de invoer te partitioneren table bij het aanroepen van UDFS in SQL-query's, er een overeenkomstige manier is voor de analyze
methode van de UDTF om in plaats daarvan automatisch dezelfde partitioneringsmethode op te geven.
- In plaats van een UDTF aan te roepen als
SELECT * FROM udtf(TABLE(t) PARTITION BY a)
, kunt u de methodeanalyze
update om het veldpartitionBy=[PartitioningColumn("a")]
te set en de functie gewoon aanroepen met behulp vanSELECT * FROM udtf(TABLE(t))
. - Met hetzelfde token kunt u, in plaats van
TABLE(t) WITH SINGLE PARTITION ORDER BY b
op te geven in de SQL-query, de veldenanalyze
setwithSinglePartition=true
enorderBy=[OrderingColumn("b")]
maken en vervolgens gewoonTABLE(t)
doorgeven. - In plaats van
TABLE(SELECT a FROM t)
door te geven in de SQL-query, kunt uanalyze
setselect=[SelectedColumn("a")]
maken en vervolgensTABLE(t)
doorgeven.
In het volgende voorbeeld retourneert analyze
een constante uitvoer schema, selecteert u een subset van columns uit de invoer tableen geeft u op dat de invoer table is gepartitioneerd voor verschillende UDTF-aanroepen op basis van de values van de date
column:
@staticmethod
def analyze(*args) -> AnalyzeResult:
"""
The input table will be partitioned across several UDTF calls based on the monthly
values of each `date` column. The rows within each partition will arrive ordered by the `date`
column. The UDTF will only receive the `date` and `word` columns from the input table.
"""
from pyspark.sql.functions import (
AnalyzeResult,
OrderingColumn,
PartitioningColumn,
)
assert len(args) == 1, "This function accepts one argument only"
assert args[0].isTable, "Only table arguments are supported"
return AnalyzeResult(
schema=StructType()
.add("month", DateType())
.add('longest_word", IntegerType()),
partitionBy=[
PartitioningColumn("extract(month from date)")],
orderBy=[
OrderingColumn("date")],
select=[
SelectedColumn("date"),
SelectedColumn(
name="length(word),
alias="length_word")])