Uživatelem definované funkce tabulek v Pythonu (UDTFs)
Důležitý
Tato funkce je ve verzi Public Preview v Databricks Runtime 14.3 LTS a vyšší.
Uživatelem definovaná funkce tabulky (UDTF) umožňuje registrovat funkce, které vracejí tabulky místo skalárních hodnot. Na rozdíl od skalárních funkcí, které vrací jednu výslednou hodnotu z každého volání, je každý UDTF vyvolán v klauzuli FROM
příkazu SQL a vrací celou tabulku jako výstup.
Každé volání UDTF může přijímat nula nebo více argumentů. Tyto argumenty mohou být skalární výrazy nebo argumenty tabulky představující celé vstupní tabulky.
Základní syntaxe UDTF
Apache Spark implementuje Python UDTF jako třídy Pythonu s nezbytnou metodou eval
, která používá yield
k generování výstupních řádků.
Pokud chcete třídu použít jako UDTF, musíte importovat funkci PySpark udtf
. Databricks doporučuje používat tuto funkci jako dekorátor a explicitně zadávat názvy a typy polí pomocí možnosti returnType
(pokud třída nedefinuje metodu analyze
, jak je popsáno v další části).
Následující funkce UDTF vytvoří tabulku s pevným seznamem dvou celých argumentů:
from pyspark.sql.functions import lit, udtf
@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
def eval(self, x: int, y: int):
yield x + y, x - y
GetSumDiff(lit(1), lit(2)).show()
+----+-----+
| sum| diff|
+----+-----+
| 3| -1|
+----+-----+
Zaregistrujte UDTF
UDTFs jsou registrovány v místním SparkSession
a jsou izolovány na úrovni poznámkového bloku nebo úlohy.
UDTFs nelze zaregistrovat jako objekty v katalogu Unity a UDTFs nelze použít se sklady SQL.
UDTF můžete zaregistrovat do aktuálního SparkSession
pro použití v dotazech SQL pomocí funkce spark.udtf.register()
. Zadejte název funkce SQL a třídy UDTF Pythonu.
spark.udtf.register("get_sum_diff", GetSumDiff)
Zavolejte registrované UDTF
Po registraci můžete použít UDTF v SQL pomocí kouzelného příkazu %sql
nebo funkce spark.sql()
.
spark.udtf.register("get_sum_diff", GetSumDiff)
spark.sql("SELECT * FROM get_sum_diff(1,2);")
%sql
SELECT * FROM get_sum_diff(1,2);
Použití Apache Arrow
Pokud váš UDTF přijímá jako vstup malé množství dat, ale vypíše velkou tabulku, databricks doporučuje použít Apache Arrow. Můžete ho povolit zadáním parametru useArrow
při deklarování UDTF:
@udtf(returnType="c1: int, c2: int", useArrow=True)
Seznamy argumentů proměnných – *args a **kwargs
K zpracování nezadaného počtu vstupních hodnot můžete použít *args
Pythonu nebo syntaxi **kwargs
a implementovat logiku.
Následující příklad vrátí stejný výsledek při explicitní kontrole vstupní délky a typů argumentů:
@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
def eval(self, *args):
assert(len(args) == 2)
assert(isinstance(arg, int) for arg in args)
x = args[0]
y = args[1]
yield x + y, x - y
GetSumDiff(lit(1), lit(2)).show()
Tady je stejný příklad, ale použití argumentů klíčových slov:
@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
def eval(self, **kwargs):
x = kwargs["x"]
y = kwargs["y"]
yield x + y, x - y
GetSumDiff(x=lit(1), y=lit(2)).show()
Definování statického schématu v době registrace
UDTF vrací řádky s výstupním schématem, které tvoří seřazenou sekvenci názvů a typů sloupců. Pokud by schéma UDTF vždy mělo zůstat stejné pro všechny dotazy, můžete za @udtf
dekorátoru zadat statické pevné schéma. Musí se jednat o StructType
:
StructType().add("c1", StringType())
Nebo řetězec DDL představující typ struktury:
c1: string
Výpočet dynamického schématu v době volání funkce
Uživatelem definované tabulkové funkce (UDTFs) mohou také programově vypočítat výstupní schéma pro každý hovor na základě hodnot vstupních argumentů. Chcete-li to provést, definujte statickou metodu s názvem analyze
, která přijímá nula nebo více parametrů, které odpovídají argumentům zadaným pro konkrétní volání UDTF.
Každý argument analyze
metody je instance třídy AnalyzeArgument
, která obsahuje následující pole:
pole třídy AnalyzeArgument |
Popis |
---|---|
dataType |
Typ vstupního argumentu jako DataType . U argumentů vstupní tabulky se jedná o StructType představující sloupce tabulky. |
value |
Hodnota vstupního argumentu jako Optional[Any] . To je None pro argumenty tabulky nebo literální skalární argumenty, které nejsou konstantní. |
isTable |
Zda je vstupní argument tabulka jako je BooleanType . |
isConstantExpression |
Zda vstupní argument je konstantní skládací výraz jako BooleanType . |
Metoda analyze
vrátí instanci třídy AnalyzeResult
, která zahrnuje schéma výsledné tabulky jako StructType
plus některá volitelná pole. Pokud UDTF přijímá argument vstupní tabulky, pak AnalyzeResult
může také obsahovat požadovaný způsob rozdělení a pořadí řádků vstupní tabulky napříč několika voláními UDTF, jak je popsáno později.
pole třídy AnalyzeResult |
Popis |
---|---|
schema |
Schéma výsledné tabulky jako StructType . |
withSinglePartition |
Zda odeslat všechny vstupní řádky do stejné instance třídy UDTF jako BooleanType . |
partitionBy |
Pokud je nastavena na neprázdnou hodnotu, všechny řádky s každou jedinečnou kombinací hodnot partičních výrazů jsou zpracovány samostatnou instancí třídy UDTF. |
orderBy |
Pokud je nastavená hodnota neprázdná, určuje uspořádání řádků v rámci každého oddílu. |
select |
Pokud je nastavená na neprázdnou, jedná se o sekvenci výrazů, které UDTF určuje, aby nástroj Catalyst vyhodnotil sloupce ve vstupním TABLE argumentu. UDTF obdrží jeden vstupní atribut pro každý název v seznamu v pořadí, v jakém jsou uvedeny. |
Tento analyze
příklad vrátí jeden výstupní sloupec pro každé slovo ve vstupním řetězcovém argumentu.
@udtf
class MyUDTF:
@staticmethod
def analyze(text: AnalyzeArgument) -> AnalyzeResult:
schema = StructType()
for index, word in enumerate(sorted(list(set(text.value.split(" "))))):
schema = schema.add(f"word_{index}", IntegerType())
return AnalyzeResult(schema=schema)
def eval(self, text: str):
counts = {}
for word in text.split(" "):
if word not in counts:
counts[word] = 0
counts[word] += 1
result = []
for word in sorted(list(set(text.split(" ")))):
result.append(counts[word])
yield result
['word_0', 'word_1']
Předání stavu do budoucích hovorů eval
Metoda analyze
může sloužit jako vhodné místo k provedení inicializace a následnému předání výsledků do budoucího volání metody eval
pro stejný hovor UDTF.
Uděláte to tak, že vytvoříte podtřídu AnalyzeResult
a vrátíte instanci podtřídy z analyze
metody.
Potom do metody __init__
přidejte další argument, který tuto instanci přijme.
Tento analyze
příklad vrátí konstantní výstupní schéma, ale přidá vlastní informace do metadat výsledku, která budou využita budoucími voláními metody __init__
:
@dataclass
class AnalyzeResultWithBuffer(AnalyzeResult):
buffer: str = ""
@udtf
class TestUDTF:
def __init__(self, analyze_result=None):
self._total = 0
if analyze_result is not None:
self._buffer = analyze_result.buffer
else:
self._buffer = ""
@staticmethod
def analyze(argument, _) -> AnalyzeResult:
if (
argument.value is None
or argument.isTable
or not isinstance(argument.value, str)
or len(argument.value) == 0
):
raise Exception("The first argument must be a non-empty string")
assert argument.dataType == StringType()
assert not argument.isTable
return AnalyzeResultWithBuffer(
schema=StructType()
.add("total", IntegerType())
.add("buffer", StringType()),
withSinglePartition=True,
buffer=argument.value,
)
def eval(self, argument, row: Row):
self._total += 1
def terminate(self):
yield self._total, self._buffer
self.spark.udtf.register("test_udtf", TestUDTF)
spark.sql(
"""
WITH t AS (
SELECT id FROM range(1, 21)
)
SELECT total, buffer
FROM test_udtf("abc", TABLE(t))
"""
).show()
+-------+-------+
| count | buffer|
+-------+-------+
| 20 | "abc"|
+-------+-------+
Generování výstupních řádků
Metoda eval
se spustí jednou pro každý řádek argumentu vstupní tabulky (nebo jen jednou, pokud není zadán žádný argument tabulky), za ním následuje jedno vyvolání terminate
metody na konci. Metoda buď vypíše nula nebo více řádků, které odpovídají výslednému schématu tím, že vrátí řazené kolekce členů, seznamy nebo pyspark.sql.Row
objekty.
Tento příklad vrací řádek pomocí trojice tří prvků.
def eval(self, x, y, z):
yield (x, y, z)
Můžete také vynechat závorky:
def eval(self, x, y, z):
yield x, y, z
Přidejte koncovou čárku, která vrátí řádek pouze s jedním sloupcem:
def eval(self, x, y, z):
yield x,
Můžete také získat objekt pyspark.sql.Row
.
def eval(self, x, y, z)
from pyspark.sql.types import Row
yield Row(x, y, z)
Tento příklad poskytuje výstupní řádky z metody terminate
pomocí seznamu Pythonu. Stav můžete uložit uvnitř třídy z předchozích kroků při vyhodnocování UDTF, a to pro tento účel.
def terminate(self):
yield [self.x, self.y, self.z]
Předání skalárních argumentů do UDTF
Skalární argumenty můžete předat do UDTF jako konstantní výrazy, které obsahují hodnoty literálů nebo funkce založené na nich. Například:
SELECT * FROM udtf(42, group => upper("finance_department"));
Předání argumentů tabulky do UDTF
Uživatelsky definované tabulkové funkce Pythonu mohou kromě skalárních vstupních argumentů přijmout také vstupní tabulku jako argument. Jeden UDTF může také přijmout argument tabulky a více skalárních argumentů.
Každý dotaz SQL pak může poskytnout vstupní tabulku pomocí klíčového slova TABLE
následované závorky kolem příslušného identifikátoru tabulky, jako je TABLE(t)
. Alternativně můžete předat poddotaz tabulky, například TABLE(SELECT a, b, c FROM t)
nebo TABLE(SELECT t1.a, t2.b FROM t1 INNER JOIN t2 USING (key))
.
Argument vstupní tabulky je pak reprezentován jako pyspark.sql.Row
argument metody eval
s jedním voláním metody eval
pro každý řádek ve vstupní tabulce. K interakci se sloupci v jednotlivých řádcích můžete použít standardní poznámky polí sloupců PySpark. Následující příklad ukazuje explicitní import typu PySpark Row
a následné filtrování předané tabulky v poli id
:
from pyspark.sql.functions import udtf
from pyspark.sql.types import Row
@udtf(returnType="id: int")
class FilterUDTF:
def eval(self, row: Row):
if row["id"] > 5:
yield row["id"],
spark.udtf.register("filter_udtf", FilterUDTF)
K dotazování funkce použijte klíčové slovo TABLE
SQL:
SELECT * FROM filter_udtf(TABLE(SELECT * FROM range(10)));
+---+
| id|
+---+
| 6|
| 7|
| 8|
| 9|
+---+
Uveďte rozdělení vstupních řádků při volání funkce
Při volání UDTF s argumentem tabulky může každý dotaz SQL rozdělit vstupní tabulku na několik volání UDTF na základě hodnot jednoho nebo více vstupních sloupců tabulky.
Pokud chcete zadat oddíl, použijte klauzuli PARTITION BY
ve volání funkce za argumentem TABLE
.
To zaručuje, že každý vstupní řádek s každou jedinečnou kombinací hodnot dělicích sloupců bude spotřebován přesně jednou instancí třídy UDTF.
Všimněte si, že kromě jednoduchých odkazů na sloupce přijímá klauzule PARTITION BY
také libovolné výrazy založené na sloupcích vstupní tabulky. Můžete například určit LENGTH
řetězce, extrahovat měsíc z data nebo zřetězit dvě hodnoty.
Je také možné zadat WITH SINGLE PARTITION
místo PARTITION BY
, aby byl požadován pouze jeden oddíl, ve kterém musí být všechny vstupní řádky spotřebovány přesně jednou instancí třídy UDTF.
V každém oddílu můžete volitelně určit požadované pořadí vstupních řádků, které metoda eval
UDTF zpracovává. K tomu zadejte klauzuli ORDER BY
za klauzuli PARTITION BY
nebo WITH SINGLE PARTITION
popsanou výše.
Představte si například následující UDTF:
from pyspark.sql.functions import udtf
from pyspark.sql.types import Row
@udtf(returnType="a: string, b: int")
class FilterUDTF:
def __init__(self):
self.key = ""
self.max = 0
def eval(self, row: Row):
self.key = row["a"]
self.max = max(self.max, row["b"])
def terminate(self):
yield self.key, self.max
spark.udtf.register("filter_udtf", FilterUDTF)
Možnosti dělení můžete zadat při volání UDTF přes vstupní tabulku několika způsoby:
-- Create an input table with some example values.
DROP TABLE IF EXISTS values_table;
CREATE TABLE values_table (a STRING, b INT);
INSERT INTO values_table VALUES ('abc', 2), ('abc', 4), ('def', 6), ('def', 8)";
SELECT * FROM values_table;
+-------+----+
| a | b |
+-------+----+
| "abc" | 2 |
| "abc" | 4 |
| "def" | 6 |
| "def" | 8 |
+-------+----+
-- Query the UDTF with the input table as an argument and a directive to partition the input
-- rows such that all rows with each unique value in the `a` column are processed by the same
-- instance of the UDTF class. Within each partition, the rows are ordered by the `b` column.
SELECT * FROM filter_udtf(TABLE(values_table) PARTITION BY a ORDER BY b) ORDER BY 1;
+-------+----+
| a | b |
+-------+----+
| "abc" | 4 |
| "def" | 8 |
+-------+----+
-- Query the UDTF with the input table as an argument and a directive to partition the input
-- rows such that all rows with each unique result of evaluating the "LENGTH(a)" expression are
-- processed by the same instance of the UDTF class. Within each partition, the rows are ordered
-- by the `b` column.
SELECT * FROM filter_udtf(TABLE(values_table) PARTITION BY LENGTH(a) ORDER BY b) ORDER BY 1;
+-------+---+
| a | b |
+-------+---+
| "def" | 8 |
+-------+---+
-- Query the UDTF with the input table as an argument and a directive to consider all the input
-- rows in one single partition such that exactly one instance of the UDTF class consumes all of
-- the input rows. Within each partition, the rows are ordered by the `b` column.
SELECT * FROM filter_udtf(TABLE(values_table) WITH SINGLE PARTITION ORDER BY b) ORDER BY 1;
+-------+----+
| a | b |
+-------+----+
| "def" | 8 |
+-------+----+
Určete dělení vstupních řádků z metody analyze
Všimněte si, že pro každý z výše uvedených způsobů dělení vstupní tabulky při volání UDF v dotazech SQL existuje odpovídající způsob, jak metoda UDTF analyze
určit stejnou metodu dělení automaticky.
- Místo volání UDTF jako
SELECT * FROM udtf(TABLE(t) PARTITION BY a)
můžete aktualizovat metoduanalyze
, a tím nastavit polepartitionBy=[PartitioningColumn("a")]
a jednoduše volat funkci pomocíSELECT * FROM udtf(TABLE(t))
. - Pomocí stejného tokenu můžete místo zadávání
TABLE(t) WITH SINGLE PARTITION ORDER BY b
v dotazu SQL nastavitanalyze
polewithSinglePartition=true
aorderBy=[OrderingColumn("b")]
a pak jednoduše předatTABLE(t)
. - Místo předávání
TABLE(SELECT a FROM t)
v dotazu SQL můžete nechatanalyze
nastavitselect=[SelectedColumn("a")]
a pak jednoduše předatTABLE(t)
.
V následujícím příkladu analyze
vrátí konstantní výstupní schéma, vybere podmnožinu sloupců ze vstupní tabulky a určí, že vstupní tabulka je rozdělena na několik volání UDTF na základě hodnot date
sloupce:
@staticmethod
def analyze(*args) -> AnalyzeResult:
"""
The input table will be partitioned across several UDTF calls based on the monthly
values of each `date` column. The rows within each partition will arrive ordered by the `date`
column. The UDTF will only receive the `date` and `word` columns from the input table.
"""
from pyspark.sql.functions import (
AnalyzeResult,
OrderingColumn,
PartitioningColumn,
)
assert len(args) == 1, "This function accepts one argument only"
assert args[0].isTable, "Only table arguments are supported"
return AnalyzeResult(
schema=StructType()
.add("month", DateType())
.add('longest_word", IntegerType()),
partitionBy=[
PartitioningColumn("extract(month from date)")],
orderBy=[
OrderingColumn("date")],
select=[
SelectedColumn("date"),
SelectedColumn(
name="length(word),
alias="length_word")])