Sdílet prostřednictvím


Uživatelem definované funkce tabulek v Pythonu (UDTFs)

Důležitý

Tato funkce je ve verzi Public Preview v Databricks Runtime 14.3 LTS a vyšší.

Uživatelem definovaná funkce tabulky (UDTF) umožňuje registrovat funkce, které vracejí tabulky místo skalárních hodnot. Na rozdíl od skalárních funkcí, které vrací jednu výslednou hodnotu z každého volání, je každý UDTF vyvolán v klauzuli FROM příkazu SQL a vrací celou tabulku jako výstup.

Každé volání UDTF může přijímat nula nebo více argumentů. Tyto argumenty mohou být skalární výrazy nebo argumenty tabulky představující celé vstupní tabulky.

Základní syntaxe UDTF

Apache Spark implementuje Python UDTF jako třídy Pythonu s nezbytnou metodou eval, která používá yield k generování výstupních řádků.

Pokud chcete třídu použít jako UDTF, musíte importovat funkci PySpark udtf. Databricks doporučuje používat tuto funkci jako dekorátor a explicitně zadávat názvy a typy polí pomocí možnosti returnType (pokud třída nedefinuje metodu analyze, jak je popsáno v další části).

Následující funkce UDTF vytvoří tabulku s pevným seznamem dvou celých argumentů:

from pyspark.sql.functions import lit, udtf

@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
    def eval(self, x: int, y: int):
        yield x + y, x - y

GetSumDiff(lit(1), lit(2)).show()
+----+-----+
| sum| diff|
+----+-----+
|   3|   -1|
+----+-----+

Zaregistrujte UDTF

UDTFs jsou registrovány v místním SparkSession a jsou izolovány na úrovni poznámkového bloku nebo úlohy.

UDTFs nelze zaregistrovat jako objekty v katalogu Unity a UDTFs nelze použít se sklady SQL.

UDTF můžete zaregistrovat do aktuálního SparkSession pro použití v dotazech SQL pomocí funkce spark.udtf.register(). Zadejte název funkce SQL a třídy UDTF Pythonu.

spark.udtf.register("get_sum_diff", GetSumDiff)

Zavolejte registrované UDTF

Po registraci můžete použít UDTF v SQL pomocí kouzelného příkazu %sql nebo funkce spark.sql().

spark.udtf.register("get_sum_diff", GetSumDiff)
spark.sql("SELECT * FROM get_sum_diff(1,2);")
%sql
SELECT * FROM get_sum_diff(1,2);

Použití Apache Arrow

Pokud váš UDTF přijímá jako vstup malé množství dat, ale vypíše velkou tabulku, databricks doporučuje použít Apache Arrow. Můžete ho povolit zadáním parametru useArrow při deklarování UDTF:

@udtf(returnType="c1: int, c2: int", useArrow=True)

Seznamy argumentů proměnných – *args a **kwargs

K zpracování nezadaného počtu vstupních hodnot můžete použít *args Pythonu nebo syntaxi **kwargs a implementovat logiku.

Následující příklad vrátí stejný výsledek při explicitní kontrole vstupní délky a typů argumentů:

@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
    def eval(self, *args):
        assert(len(args) == 2)
        assert(isinstance(arg, int) for arg in args)
        x = args[0]
        y = args[1]
        yield x + y, x - y

GetSumDiff(lit(1), lit(2)).show()

Tady je stejný příklad, ale použití argumentů klíčových slov:

@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
    def eval(self, **kwargs):
        x = kwargs["x"]
        y = kwargs["y"]
        yield x + y, x - y

GetSumDiff(x=lit(1), y=lit(2)).show()

Definování statického schématu v době registrace

UDTF vrací řádky s výstupním schématem, které tvoří seřazenou sekvenci názvů a typů sloupců. Pokud by schéma UDTF vždy mělo zůstat stejné pro všechny dotazy, můžete za @udtf dekorátoru zadat statické pevné schéma. Musí se jednat o StructType:

StructType().add("c1", StringType())

Nebo řetězec DDL představující typ struktury:

c1: string

Výpočet dynamického schématu v době volání funkce

Uživatelem definované tabulkové funkce (UDTFs) mohou také programově vypočítat výstupní schéma pro každý hovor na základě hodnot vstupních argumentů. Chcete-li to provést, definujte statickou metodu s názvem analyze, která přijímá nula nebo více parametrů, které odpovídají argumentům zadaným pro konkrétní volání UDTF.

Každý argument analyze metody je instance třídy AnalyzeArgument, která obsahuje následující pole:

pole třídy AnalyzeArgument Popis
dataType Typ vstupního argumentu jako DataType. U argumentů vstupní tabulky se jedná o StructType představující sloupce tabulky.
value Hodnota vstupního argumentu jako Optional[Any]. To je None pro argumenty tabulky nebo literální skalární argumenty, které nejsou konstantní.
isTable Zda je vstupní argument tabulka jako je BooleanType.
isConstantExpression Zda vstupní argument je konstantní skládací výraz jako BooleanType.

Metoda analyze vrátí instanci třídy AnalyzeResult, která zahrnuje schéma výsledné tabulky jako StructType plus některá volitelná pole. Pokud UDTF přijímá argument vstupní tabulky, pak AnalyzeResult může také obsahovat požadovaný způsob rozdělení a pořadí řádků vstupní tabulky napříč několika voláními UDTF, jak je popsáno později.

pole třídy AnalyzeResult Popis
schema Schéma výsledné tabulky jako StructType.
withSinglePartition Zda odeslat všechny vstupní řádky do stejné instance třídy UDTF jako BooleanType.
partitionBy Pokud je nastavena na neprázdnou hodnotu, všechny řádky s každou jedinečnou kombinací hodnot partičních výrazů jsou zpracovány samostatnou instancí třídy UDTF.
orderBy Pokud je nastavená hodnota neprázdná, určuje uspořádání řádků v rámci každého oddílu.
select Pokud je nastavená na neprázdnou, jedná se o sekvenci výrazů, které UDTF určuje, aby nástroj Catalyst vyhodnotil sloupce ve vstupním TABLE argumentu. UDTF obdrží jeden vstupní atribut pro každý název v seznamu v pořadí, v jakém jsou uvedeny.

Tento analyze příklad vrátí jeden výstupní sloupec pro každé slovo ve vstupním řetězcovém argumentu.

@udtf
class MyUDTF:
  @staticmethod
  def analyze(text: AnalyzeArgument) -> AnalyzeResult:
    schema = StructType()
    for index, word in enumerate(sorted(list(set(text.value.split(" "))))):
      schema = schema.add(f"word_{index}", IntegerType())
    return AnalyzeResult(schema=schema)

  def eval(self, text: str):
    counts = {}
    for word in text.split(" "):
      if word not in counts:
            counts[word] = 0
      counts[word] += 1
    result = []
    for word in sorted(list(set(text.split(" ")))):
      result.append(counts[word])
    yield result
['word_0', 'word_1']

Předání stavu do budoucích hovorů eval

Metoda analyze může sloužit jako vhodné místo k provedení inicializace a následnému předání výsledků do budoucího volání metody eval pro stejný hovor UDTF.

Uděláte to tak, že vytvoříte podtřídu AnalyzeResult a vrátíte instanci podtřídy z analyze metody. Potom do metody __init__ přidejte další argument, který tuto instanci přijme.

Tento analyze příklad vrátí konstantní výstupní schéma, ale přidá vlastní informace do metadat výsledku, která budou využita budoucími voláními metody __init__:

@dataclass
class AnalyzeResultWithBuffer(AnalyzeResult):
    buffer: str = ""

@udtf
class TestUDTF:
  def __init__(self, analyze_result=None):
    self._total = 0
    if analyze_result is not None:
      self._buffer = analyze_result.buffer
    else:
      self._buffer = ""

  @staticmethod
  def analyze(argument, _) -> AnalyzeResult:
    if (
      argument.value is None
      or argument.isTable
      or not isinstance(argument.value, str)
      or len(argument.value) == 0
    ):
      raise Exception("The first argument must be a non-empty string")
    assert argument.dataType == StringType()
    assert not argument.isTable
    return AnalyzeResultWithBuffer(
      schema=StructType()
        .add("total", IntegerType())
        .add("buffer", StringType()),
      withSinglePartition=True,
      buffer=argument.value,
    )

  def eval(self, argument, row: Row):
    self._total += 1

  def terminate(self):
    yield self._total, self._buffer

self.spark.udtf.register("test_udtf", TestUDTF)

spark.sql(
  """
  WITH t AS (
    SELECT id FROM range(1, 21)
  )
  SELECT total, buffer
  FROM test_udtf("abc", TABLE(t))
  """
).show()
+-------+-------+
| count | buffer|
+-------+-------+
|    20 |  "abc"|
+-------+-------+

Generování výstupních řádků

Metoda eval se spustí jednou pro každý řádek argumentu vstupní tabulky (nebo jen jednou, pokud není zadán žádný argument tabulky), za ním následuje jedno vyvolání terminate metody na konci. Metoda buď vypíše nula nebo více řádků, které odpovídají výslednému schématu tím, že vrátí řazené kolekce členů, seznamy nebo pyspark.sql.Row objekty.

Tento příklad vrací řádek pomocí trojice tří prvků.

def eval(self, x, y, z):
  yield (x, y, z)

Můžete také vynechat závorky:

def eval(self, x, y, z):
  yield x, y, z

Přidejte koncovou čárku, která vrátí řádek pouze s jedním sloupcem:

def eval(self, x, y, z):
  yield x,

Můžete také získat objekt pyspark.sql.Row.

def eval(self, x, y, z)
  from pyspark.sql.types import Row
  yield Row(x, y, z)

Tento příklad poskytuje výstupní řádky z metody terminate pomocí seznamu Pythonu. Stav můžete uložit uvnitř třídy z předchozích kroků při vyhodnocování UDTF, a to pro tento účel.

def terminate(self):
  yield [self.x, self.y, self.z]

Předání skalárních argumentů do UDTF

Skalární argumenty můžete předat do UDTF jako konstantní výrazy, které obsahují hodnoty literálů nebo funkce založené na nich. Například:

SELECT * FROM udtf(42, group => upper("finance_department"));

Předání argumentů tabulky do UDTF

Uživatelsky definované tabulkové funkce Pythonu mohou kromě skalárních vstupních argumentů přijmout také vstupní tabulku jako argument. Jeden UDTF může také přijmout argument tabulky a více skalárních argumentů.

Každý dotaz SQL pak může poskytnout vstupní tabulku pomocí klíčového slova TABLE následované závorky kolem příslušného identifikátoru tabulky, jako je TABLE(t). Alternativně můžete předat poddotaz tabulky, například TABLE(SELECT a, b, c FROM t) nebo TABLE(SELECT t1.a, t2.b FROM t1 INNER JOIN t2 USING (key)).

Argument vstupní tabulky je pak reprezentován jako pyspark.sql.Row argument metody eval s jedním voláním metody eval pro každý řádek ve vstupní tabulce. K interakci se sloupci v jednotlivých řádcích můžete použít standardní poznámky polí sloupců PySpark. Následující příklad ukazuje explicitní import typu PySpark Row a následné filtrování předané tabulky v poli id:

from pyspark.sql.functions import udtf
from pyspark.sql.types import Row

@udtf(returnType="id: int")
class FilterUDTF:
    def eval(self, row: Row):
        if row["id"] > 5:
            yield row["id"],

spark.udtf.register("filter_udtf", FilterUDTF)

K dotazování funkce použijte klíčové slovo TABLE SQL:

SELECT * FROM filter_udtf(TABLE(SELECT * FROM range(10)));
+---+
| id|
+---+
|  6|
|  7|
|  8|
|  9|
+---+

Uveďte rozdělení vstupních řádků při volání funkce

Při volání UDTF s argumentem tabulky může každý dotaz SQL rozdělit vstupní tabulku na několik volání UDTF na základě hodnot jednoho nebo více vstupních sloupců tabulky.

Pokud chcete zadat oddíl, použijte klauzuli PARTITION BY ve volání funkce za argumentem TABLE. To zaručuje, že každý vstupní řádek s každou jedinečnou kombinací hodnot dělicích sloupců bude spotřebován přesně jednou instancí třídy UDTF.

Všimněte si, že kromě jednoduchých odkazů na sloupce přijímá klauzule PARTITION BY také libovolné výrazy založené na sloupcích vstupní tabulky. Můžete například určit LENGTH řetězce, extrahovat měsíc z data nebo zřetězit dvě hodnoty.

Je také možné zadat WITH SINGLE PARTITION místo PARTITION BY, aby byl požadován pouze jeden oddíl, ve kterém musí být všechny vstupní řádky spotřebovány přesně jednou instancí třídy UDTF.

V každém oddílu můžete volitelně určit požadované pořadí vstupních řádků, které metoda eval UDTF zpracovává. K tomu zadejte klauzuli ORDER BY za klauzuli PARTITION BY nebo WITH SINGLE PARTITION popsanou výše.

Představte si například následující UDTF:

from pyspark.sql.functions import udtf
from pyspark.sql.types import Row

@udtf(returnType="a: string, b: int")
class FilterUDTF:
  def __init__(self):
    self.key = ""
    self.max = 0

  def eval(self, row: Row):
    self.key = row["a"]
    self.max = max(self.max, row["b"])

  def terminate(self):
    yield self.key, self.max

spark.udtf.register("filter_udtf", FilterUDTF)

Možnosti dělení můžete zadat při volání UDTF přes vstupní tabulku několika způsoby:

-- Create an input table with some example values.
DROP TABLE IF EXISTS values_table;
CREATE TABLE values_table (a STRING, b INT);
INSERT INTO values_table VALUES ('abc', 2), ('abc', 4), ('def', 6), ('def', 8)";
SELECT * FROM values_table;
+-------+----+
|     a |  b |
+-------+----+
| "abc" | 2  |
| "abc" | 4  |
| "def" | 6  |
| "def" | 8  |
+-------+----+
-- Query the UDTF with the input table as an argument and a directive to partition the input
-- rows such that all rows with each unique value in the `a` column are processed by the same
-- instance of the UDTF class. Within each partition, the rows are ordered by the `b` column.
SELECT * FROM filter_udtf(TABLE(values_table) PARTITION BY a ORDER BY b) ORDER BY 1;
+-------+----+
|     a |  b |
+-------+----+
| "abc" | 4  |
| "def" | 8  |
+-------+----+

-- Query the UDTF with the input table as an argument and a directive to partition the input
-- rows such that all rows with each unique result of evaluating the "LENGTH(a)" expression are
-- processed by the same instance of the UDTF class. Within each partition, the rows are ordered
-- by the `b` column.
SELECT * FROM filter_udtf(TABLE(values_table) PARTITION BY LENGTH(a) ORDER BY b) ORDER BY 1;
+-------+---+
|     a | b |
+-------+---+
| "def" | 8 |
+-------+---+
-- Query the UDTF with the input table as an argument and a directive to consider all the input
-- rows in one single partition such that exactly one instance of the UDTF class consumes all of
-- the input rows. Within each partition, the rows are ordered by the `b` column.
SELECT * FROM filter_udtf(TABLE(values_table) WITH SINGLE PARTITION ORDER BY b) ORDER BY 1;
+-------+----+
|     a |  b |
+-------+----+
| "def" | 8 |
+-------+----+

Určete dělení vstupních řádků z metody analyze

Všimněte si, že pro každý z výše uvedených způsobů dělení vstupní tabulky při volání UDF v dotazech SQL existuje odpovídající způsob, jak metoda UDTF analyze určit stejnou metodu dělení automaticky.

  • Místo volání UDTF jako SELECT * FROM udtf(TABLE(t) PARTITION BY a)můžete aktualizovat metodu analyze, a tím nastavit pole partitionBy=[PartitioningColumn("a")] a jednoduše volat funkci pomocí SELECT * FROM udtf(TABLE(t)).
  • Pomocí stejného tokenu můžete místo zadávání TABLE(t) WITH SINGLE PARTITION ORDER BY b v dotazu SQL nastavit analyze pole withSinglePartition=true a orderBy=[OrderingColumn("b")] a pak jednoduše předat TABLE(t).
  • Místo předávání TABLE(SELECT a FROM t) v dotazu SQL můžete nechat analyze nastavit select=[SelectedColumn("a")] a pak jednoduše předat TABLE(t).

V následujícím příkladu analyze vrátí konstantní výstupní schéma, vybere podmnožinu sloupců ze vstupní tabulky a určí, že vstupní tabulka je rozdělena na několik volání UDTF na základě hodnot date sloupce:

@staticmethod
def analyze(*args) -> AnalyzeResult:
  """
  The input table will be partitioned across several UDTF calls based on the monthly
  values of each `date` column. The rows within each partition will arrive ordered by the `date`
  column. The UDTF will only receive the `date` and `word` columns from the input table.
  """
  from pyspark.sql.functions import (
    AnalyzeResult,
    OrderingColumn,
    PartitioningColumn,
  )

  assert len(args) == 1, "This function accepts one argument only"
  assert args[0].isTable, "Only table arguments are supported"
  return AnalyzeResult(
    schema=StructType()
      .add("month", DateType())
      .add('longest_word", IntegerType()),
    partitionBy=[
      PartitioningColumn("extract(month from date)")],
    orderBy=[
      OrderingColumn("date")],
    select=[
      SelectedColumn("date"),
      SelectedColumn(
        name="length(word),
        alias="length_word")])