Uživatelem definované funkce tabulek v Pythonu (UDTFs)
Důležité
Tato funkce je ve verzi Public Preview v Databricks Runtime 14.3 LTS a vyšší.
Uživatelem definovaná funkce tabulky (UDTF) umožňuje registrovat funkce, které vracejí tabulky místo skalárních hodnot. Na rozdíl od skalárních funkcí, které vrací jednu výslednou hodnotu z každého volání, je každý UDTF vyvolán v klauzuli příkazu FROM
SQL a vrací celou tabulku jako výstup.
Každé volání UDTF může přijímat nula nebo více argumentů. Tyto argumenty mohou být skalární výrazy nebo argumenty tabulky představující celé vstupní tabulky.
Základní syntaxe UDTF
Apache Spark implementuje definované uživatelem Pythonu jako třídy Pythonu s povinnou eval
metodou, která používá yield
k generování výstupních řádků.
Pokud chcete třídu použít jako UDTF, musíte importovat funkci PySpark udtf
. Databricks doporučuje použít tuto funkci jako dekorátor a explicitně zadat názvy a typy polí pomocí returnType
této možnosti (pokud třída nedefinuje metodu popsanou analyze
v další části).
Následující funkce UDTF vytvoří tabulku s pevným seznamem dvou celých argumentů:
from pyspark.sql.functions import lit, udtf
@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
def eval(self, x: int, y: int):
yield x + y, x - y
GetSumDiff(lit(1), lit(2)).show()
+----+-----+
| sum| diff|
+----+-----+
| 3| -1|
+----+-----+
Registrace UDTF
Funkce definované uživatelem jsou zaregistrované v místním SparkSession
prostředí a jsou izolované na úrovni poznámkového bloku nebo úlohy.
UDTFs nelze zaregistrovat jako objekty v katalogu Unity a UDTFs nelze použít se sklady SQL.
UDTF můžete zaregistrovat do aktuálního SparkSession
stavu pro použití v dotazech SQL s funkcí spark.udtf.register()
. Zadejte název funkce SQL a třídy UDTF Pythonu.
spark.udtf.register("get_sum_diff", GetSumDiff)
Volání registrovaného UDTF
Po registraci můžete použít UDTF v SQL pomocí %sql
příkazu magic nebo spark.sql()
funkce:
spark.udtf.register("get_sum_diff", GetSumDiff)
spark.sql("SELECT * FROM get_sum_diff(1,2);")
%sql
SELECT * FROM get_sum_diff(1,2);
Použití Apache Arrow
Pokud váš UDTF přijímá jako vstup malé množství dat, ale vypíše velkou tabulku, databricks doporučuje použít Apache Arrow. Můžete ho povolit zadáním parametru useArrow
při deklarování UDTF:
@udtf(returnType="c1: int, c2: int", useArrow=True)
Seznamy argumentů proměnných – *args a **kwargs
K zpracování nezadaného počtu vstupních hodnot můžete použít Python *args
nebo **kwargs
syntaxi a implementovat logiku.
Následující příklad vrátí stejný výsledek při explicitní kontrole vstupní délky a typů argumentů:
@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
def eval(self, *args):
assert(len(args) == 2)
assert(isinstance(arg, int) for arg in args)
x = args[0]
y = args[1]
yield x + y, x - y
GetSumDiff(lit(1), lit(2)).show()
Tady je stejný příklad, ale použití argumentů klíčových slov:
@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
def eval(self, **kwargs):
x = kwargs["x"]
y = kwargs["y"]
yield x + y, x - y
GetSumDiff(x=lit(1), y=lit(2)).show()
Definování statického schématu v době registrace
UDTF vrací řádky s výstupním schématem, které tvoří seřazenou sekvenci názvů a typů sloupců. Pokud by schéma UDTF vždy mělo zůstat stejné pro všechny dotazy, můžete za dekorátorem zadat statické a pevné schéma @udtf
. Musí to být StructType
buď:
StructType().add("c1", StringType())
Nebo řetězec DDL představující typ struktury:
c1: string
Výpočet dynamického schématu v době volání funkce
Funkce definované uživatelem můžou také programově vypočítat výstupní schéma pro každé volání v závislosti na hodnotách vstupních argumentů. Chcete-li to provést, definujte statickou metodu, analyze
která přijímá nula nebo více parametrů, které odpovídají argumentům zadaným pro konkrétní volání UDTF.
Každý argument analyze
metody je instance AnalyzeArgument
třídy, která obsahuje následující pole:
AnalyzeArgument Pole třídy |
Popis |
---|---|
dataType |
Typ vstupního argumentu jako DataType . U argumentů vstupní tabulky se jedná o StructType reprezentaci sloupců tabulky. |
value |
Hodnota vstupního argumentu jako Optional[Any] . Toto je None pro argumenty tabulky nebo skalární argumenty literálu, které nejsou konstantní. |
isTable |
Zda vstupní argument je tabulka jako BooleanType . |
isConstantExpression |
Zda vstupní argument je konstantní skládací výraz jako BooleanType . |
Metoda analyze
vrátí instanci AnalyzeResult
třídy, která zahrnuje schéma výsledné tabulky jako StructType
plus některá volitelná pole. Pokud UDTF přijímá argument vstupní tabulky, AnalyzeResult
může také zahrnovat požadovaný způsob rozdělení a pořadí řádků vstupní tabulky napříč několika voláními UDTF, jak je popsáno později.
AnalyzeResult Pole třídy |
Popis |
---|---|
schema |
Schéma výsledné tabulky jako StructType . |
withSinglePartition |
Zda odeslat všechny vstupní řádky do stejné instance třídy UDTF jako BooleanType . |
partitionBy |
Pokud je nastavena na neprázdné, všechny řádky s každou jedinečnou kombinací hodnot výrazů dělení jsou spotřebovány samostatnou instancí UDTF třídy. |
orderBy |
Pokud je nastavená hodnota neprázdná, určuje pořadí řádků v rámci každého oddílu. |
select |
Pokud je nastavená na neprázdnou, jedná se o sekvenci výrazů, které UDTF určuje, aby nástroj Catalyst vyhodnocoval sloupce ve vstupním argumentu TABLE. UDTF obdrží jeden vstupní atribut pro každý název v seznamu v pořadí, v jakém jsou uvedeny. |
Tento analyze
příklad vrátí jeden výstupní sloupec pro každé slovo ve vstupním řetězcovém argumentu.
@udtf
class MyUDTF:
@staticmethod
def analyze(text: AnalyzeArgument) -> AnalyzeResult:
schema = StructType()
for index, word in enumerate(sorted(list(set(text.value.split(" "))))):
schema = schema.add(f"word_{index}", IntegerType())
return AnalyzeResult(schema=schema)
def eval(self, text: str):
counts = {}
for word in text.split(" "):
if word not in counts:
counts[word] = 0
counts[word] += 1
result = []
for word in sorted(list(set(text.split(" ")))):
result.append(counts[word])
yield result
['word_0', 'word_1']
Přesměrování stavu na budoucí eval
volání
Tato analyze
metoda může sloužit jako vhodné místo k provedení inicializace a následné předání výsledků do budoucích eval
volání metody pro stejné volání UDTF.
Uděláte to tak, že vytvoříte podtřídu AnalyzeResult
a vrátíte instanci podtřídy analyze
z metody.
Pak do metody přidejte další argument __init__
, který tuto instanci přijme.
Tento analyze
příklad vrátí konstantní výstupní schéma, ale přidá vlastní informace do metadat výsledku, která se budou využívat budoucí __init__
volání metody:
@dataclass
class AnalyzeResultWithBuffer(AnalyzeResult):
buffer: str = ""
@udtf
class TestUDTF:
def __init__(self, analyze_result=None):
self._total = 0
if analyze_result is not None:
self._buffer = analyze_result.buffer
else:
self._buffer = ""
@staticmethod
def analyze(argument, _) -> AnalyzeResult:
if (
argument.value is None
or argument.isTable
or not isinstance(argument.value, str)
or len(argument.value) == 0
):
raise Exception("The first argument must be a non-empty string")
assert argument.dataType == StringType()
assert not argument.isTable
return AnalyzeResultWithBuffer(
schema=StructType()
.add("total", IntegerType())
.add("buffer", StringType()),
withSinglePartition=True,
buffer=argument.value,
)
def eval(self, argument, row: Row):
self._total += 1
def terminate(self):
yield self._total, self._buffer
self.spark.udtf.register("test_udtf", TestUDTF)
spark.sql(
"""
WITH t AS (
SELECT id FROM range(1, 21)
)
SELECT total, buffer
FROM test_udtf("abc", TABLE(t))
"""
).show()
+-------+-------+
| count | buffer|
+-------+-------+
| 20 | "abc"|
+-------+-------+
Výnos výstupních řádků
Metoda eval
se spustí jednou pro každý řádek argumentu vstupní tabulky (nebo jen jednou, pokud není zadán žádný argument tabulky), za ním následuje jedno vyvolání terminate
metody na konci. Metoda buď vypíše nula nebo více řádků odpovídajících schématu výsledku tím, že získá řazené kolekce členů, seznamy nebo pyspark.sql.Row
objekty.
Tento příklad vrátí řádek zadáním řazené kolekce členů tří prvků:
def eval(self, x, y, z):
yield (x, y, z)
Můžete také vynechat závorky:
def eval(self, x, y, z):
yield x, y, z
Přidejte koncovou čárku, která vrátí řádek pouze s jedním sloupcem:
def eval(self, x, y, z):
yield x,
Můžete také získat pyspark.sql.Row
objekt.
def eval(self, x, y, z)
from pyspark.sql.types import Row
yield Row(x, y, z)
Tento příklad poskytuje výstupní řádky z terminate
metody pomocí seznamu Pythonu. Stav můžete uložit uvnitř třídy z předchozích kroků v vyhodnocení UDTF pro tento účel.
def terminate(self):
yield [self.x, self.y, self.z]
Předání skalárních argumentů do UDTF
Skalární argumenty můžete předat do UDTF jako konstantní výrazy, které obsahují hodnoty literálů nebo funkce založené na nich. Příklad:
SELECT * FROM udtf(42, group => upper("finance_department"));
Předání argumentů tabulky do UDTF
Uživatelem definované funkce Pythonu můžou kromě skalárních vstupních argumentů přijmout vstupní tabulku jako argumenty. Jeden UDTF může také přijmout argument tabulky a více skalárních argumentů.
Jakýkoli dotaz SQL pak může poskytnout vstupní tabulku pomocí klíčového TABLE
slova následovaného závorky kolem příslušného identifikátoru tabulky, například TABLE(t)
. Alternativně můžete předat poddotaz tabulky, například TABLE(SELECT a, b, c FROM t)
nebo TABLE(SELECT t1.a, t2.b FROM t1 INNER JOIN t2 USING (key))
.
Argument vstupní tabulky je pak reprezentován jako pyspark.sql.Row
argument metody eval
s jedním voláním eval
metody pro každý řádek ve vstupní tabulce. K interakci se sloupci v jednotlivých řádcích můžete použít standardní poznámky polí sloupců PySpark. Následující příklad ukazuje explicitní import typu PySpark Row
a následné filtrování předané tabulky v id
poli:
from pyspark.sql.functions import udtf
from pyspark.sql.types import Row
@udtf(returnType="id: int")
class FilterUDTF:
def eval(self, row: Row):
if row["id"] > 5:
yield row["id"],
spark.udtf.register("filter_udtf", FilterUDTF)
K dotazování funkce použijte TABLE
klíčové slovo SQL:
SELECT * FROM filter_udtf(TABLE(SELECT * FROM range(10)));
+---+
| id|
+---+
| 6|
| 7|
| 8|
| 9|
+---+
Určení dělení vstupních řádků z volání funkce
Při volání UDTF s argumentem tabulky může každý dotaz SQL rozdělit vstupní tabulku na několik volání UDTF na základě hodnot jednoho nebo více vstupních sloupců tabulky.
Pokud chcete zadat oddíl, použijte PARTITION BY
klauzuli ve volání funkce za argumentem TABLE
.
To zaručuje, že všechny vstupní řádky s každou jedinečnou kombinací hodnot sloupců dělení budou spotřebovány přesně jednou instancí UDTF třídy.
Všimněte si, že kromě jednoduchých odkazů na PARTITION BY
sloupce klauzule také přijímá libovolné výrazy založené na sloupcích vstupní tabulky. Můžete například zadat LENGTH
řetězec, extrahovat měsíc z data nebo zřetězení dvou hodnot.
Je také možné zadat WITH SINGLE PARTITION
místo PARTITION BY
vyžádání pouze jednoho oddílu, kde všechny vstupní řádky musí být spotřebovány přesně jednou instancí UDTF třídy.
V rámci každého oddílu můžete volitelně zadat požadované pořadí vstupních řádků, protože je metoda UDTF eval
využívá. Uděláte to tak, že za klauzulí nebo WITH SINGLE PARTITION
klauzulí popsanou ORDER BY
PARTITION BY
výše zadáte klauzuli.
Představte si například následující UDTF:
from pyspark.sql.functions import udtf
from pyspark.sql.types import Row
@udtf(returnType="a: string, b: int")
class FilterUDTF:
def __init__(self):
self.key = ""
self.max = 0
def eval(self, row: Row):
self.key = row["a"]
self.max = max(self.max, row["b"])
def terminate(self):
yield self.key, self.max
spark.udtf.register("filter_udtf", FilterUDTF)
Možnosti dělení můžete zadat při volání UDTF přes vstupní tabulku několika způsoby:
-- Create an input table with some example values.
DROP TABLE IF EXISTS values_table;
CREATE TABLE values_table (a STRING, b INT);
INSERT INTO values_table VALUES ('abc', 2), ('abc', 4), ('def', 6), ('def', 8)";
SELECT * FROM values_table;
+-------+----+
| a | b |
+-------+----+
| "abc" | 2 |
| "abc" | 4 |
| "def" | 6 |
| "def" | 8 |
+-------+----+
-- Query the UDTF with the input table as an argument and a directive to partition the input
-- rows such that all rows with each unique value in the `a` column are processed by the same
-- instance of the UDTF class. Within each partition, the rows are ordered by the `b` column.
SELECT * FROM filter_udtf(TABLE(values_table) PARTITION BY a ORDER BY b) ORDER BY 1;
+-------+----+
| a | b |
+-------+----+
| "abc" | 4 |
| "def" | 8 |
+-------+----+
-- Query the UDTF with the input table as an argument and a directive to partition the input
-- rows such that all rows with each unique result of evaluating the "LENGTH(a)" expression are
-- processed by the same instance of the UDTF class. Within each partition, the rows are ordered
-- by the `b` column.
SELECT * FROM filter_udtf(TABLE(values_table) PARTITION BY LENGTH(a) ORDER BY b) ORDER BY 1;
+-------+---+
| a | b |
+-------+---+
| "def" | 8 |
+-------+---+
-- Query the UDTF with the input table as an argument and a directive to consider all the input
-- rows in one single partition such that exactly one instance of the UDTF class consumes all of
-- the input rows. Within each partition, the rows are ordered by the `b` column.
SELECT * FROM filter_udtf(TABLE(values_table) WITH SINGLE PARTITION ORDER BY b) ORDER BY 1;
+-------+----+
| a | b |
+-------+----+
| "def" | 8 |
+-------+----+
Určení dělení vstupních řádků z analyze
metody
Všimněte si, že pro každý z výše uvedených způsobů dělení vstupní tabulky při volání UDF v dotazech SQL existuje odpovídající způsob, jak metoda UDTF analyze
určit stejnou metodu dělení automaticky.
- Místo volání UDTF jako
SELECT * FROM udtf(TABLE(t) PARTITION BY a)
, můžete aktualizovat metoduanalyze
nastavit polepartitionBy=[PartitioningColumn("a")]
a jednoduše volat funkci pomocíSELECT * FROM udtf(TABLE(t))
. - Pomocí stejného tokenu můžete místo zadání
TABLE(t) WITH SINGLE PARTITION ORDER BY b
v dotazu SQL nastavitanalyze
polewithSinglePartition=true
aorderBy=[OrderingColumn("b")]
pak jednoduše předatTABLE(t)
. - Místo předání
TABLE(SELECT a FROM t)
dotazu SQL můžete nastavitanalyze
select=[SelectedColumn("a")]
a pak jednoduše předatTABLE(t)
.
V následujícím příkladu analyze
vrátí konstantní výstupní schéma, vybere podmnožinu sloupců ze vstupní tabulky a určí, že vstupní tabulka je rozdělena na několik volání UDTF na základě hodnot date
sloupce:
@staticmethod
def analyze(*args) -> AnalyzeResult:
"""
The input table will be partitioned across several UDTF calls based on the monthly
values of each `date` column. The rows within each partition will arrive ordered by the `date`
column. The UDTF will only receive the `date` and `word` columns from the input table.
"""
from pyspark.sql.functions import (
AnalyzeResult,
OrderingColumn,
PartitioningColumn,
)
assert len(args) == 1, "This function accepts one argument only"
assert args[0].isTable, "Only table arguments are supported"
return AnalyzeResult(
schema=StructType()
.add("month", DateType())
.add('longest_word", IntegerType()),
partitionBy=[
PartitioningColumn("extract(month from date)")],
orderBy=[
OrderingColumn("date")],
select=[
SelectedColumn("date"),
SelectedColumn(
name="length(word),
alias="length_word")])