Поделиться через


Пользовательские табличные функции Python (UDTFs)

Важный

Эта функция находится в общедоступной предварительной версии в Databricks Runtime 14.3 LTS и выше.

Определяемая пользователем функция таблицы (UDTF) позволяет регистрировать функции, возвращающие таблицы вместо скалярных значений. В отличие от скалярных функций, возвращающих одно результирующий значение из каждого вызова, каждый UDTF вызывается в предложении FROM инструкции SQL и возвращает всю таблицу в виде выходных данных.

Каждый вызов UDTF может принимать ноль или больше аргументов. Эти аргументы могут быть скалярными выражениями или аргументами таблицы, представляющими всю входную таблицу.

Базовый синтаксис UDTF

Apache Spark реализует UDTF Python в виде классов Python с обязательным методом eval, использующим yield для вывода выходных строк.

Чтобы использовать класс в качестве UDTF, необходимо импортировать функцию PySpark udtf. Databricks рекомендует использовать эту функцию в качестве декоратора и явно указывать имена полей и типы с помощью параметра returnType (если класс не определяет метод analyze, как описано в следующем разделе).

Следующий UDTF создает таблицу с помощью фиксированного списка двух целых аргументов:

from pyspark.sql.functions import lit, udtf

@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
    def eval(self, x: int, y: int):
        yield x + y, x - y

GetSumDiff(lit(1), lit(2)).show()
+----+-----+
| sum| diff|
+----+-----+
|   3|   -1|
+----+-----+

Регистрация UDTF

UDTF регистрируются в локальном SparkSession и изолированы на уровне ноутбука или задания.

Вы не можете зарегистрировать UDTF как объекты в каталоге Unity, и UDTF нельзя использовать с хранилищами SQL.

Вы можете зарегистрировать UDTF для текущего SparkSession, чтобы использовать его в запросах SQL, с помощью функции spark.udtf.register(). Укажите имя функции SQL и класса UDTF Python.

spark.udtf.register("get_sum_diff", GetSumDiff)

Вызов зарегистрированного UDTF

После регистрации вы можете использовать UDTF в SQL с помощью волшебной команды %sql или функции spark.sql():

spark.udtf.register("get_sum_diff", GetSumDiff)
spark.sql("SELECT * FROM get_sum_diff(1,2);")
%sql
SELECT * FROM get_sum_diff(1,2);

Используйте Apache Arrow

Если UDTF получает небольшой объем данных в качестве входных данных, но выводит большую таблицу, Databricks рекомендует использовать Apache Arrow. Его можно включить, указав параметр useArrow при объявлении UDTF:

@udtf(returnType="c1: int, c2: int", useArrow=True)

Списки аргументов переменной — *args и **kwargs

Вы можете использовать синтаксис Python *args или **kwargs и реализовать логику для обработки неопределенного количества входных значений.

В следующем примере возвращается тот же результат при явной проверке длины входных данных и типов аргументов:

@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
    def eval(self, *args):
        assert(len(args) == 2)
        assert(isinstance(arg, int) for arg in args)
        x = args[0]
        y = args[1]
        yield x + y, x - y

GetSumDiff(lit(1), lit(2)).show()

Ниже приведен тот же пример, но использование аргументов ключевых слов:

@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
    def eval(self, **kwargs):
        x = kwargs["x"]
        y = kwargs["y"]
        yield x + y, x - y

GetSumDiff(x=lit(1), y=lit(2)).show()

Определение статической схемы во время регистрации

UDTF возвращает строки с выходной схемой, содержащей упорядоченную последовательность имен столбцов и типов. Если схема UDTF всегда должна оставаться одинаковой для всех запросов, можно указать статическую фиксированную схему после декоратора @udtf. Он должен быть StructType:

StructType().add("c1", StringType())

Или строка DDL, представляющая тип структуры:

c1: string

Вычисление динамической схемы во время вызова функции

Функции UDTF также могут вычислять схему вывода программным путем для каждого вызова в зависимости от значений входных аргументов. Для этого определите статический метод с именем analyze, который принимает ноль или больше параметров, которые соответствуют аргументам, предоставленным конкретному вызову UDTF.

Каждый аргумент метода analyze является экземпляром класса AnalyzeArgument, содержащего следующие поля:

поле класса AnalyzeArgument Описание
dataType Тип входного аргумента в виде DataType. Для аргументов входной таблицы это StructType, представляющая столбцы таблицы.
value Значение входного аргумента в виде Optional[Any]. Это None для аргументов таблицы или литеральных скалярных аргументов, которые не являются константами.
isTable Указывает, является ли входной аргумент таблицей в виде BooleanType.
isConstantExpression Указывает, является ли входной аргумент константным свертываемым выражением в виде BooleanType.

Метод analyze возвращает экземпляр класса AnalyzeResult, который включает схему таблицы результатов в виде StructType плюс некоторые необязательные поля. Если UDTF принимает аргумент входной таблицы, то AnalyzeResult также может включать запрошенный способ секционирования и упорядочивания строк входной таблицы в нескольких вызовах UDTF, как описано ниже.

поле класса AnalyzeResult Описание
schema Схема таблицы результатов в виде StructType.
withSinglePartition Следует ли отправлять все входные строки в один и тот же экземпляр класса UDTF, что и BooleanType.
partitionBy Если это значение не пустое, то все строки с каждым уникальным сочетанием значений выражений для секционирования используются отдельным экземпляром класса UDTF.
orderBy Если задано непустое значение, это указывает порядок строк в каждом разделе.
select Если установлено как "непустое", это последовательность выражений, которые UDTF указывает для Catalyst, чтобы вычислить против столбцов во входном аргументе TABLE. UDTF получает один входной атрибут для каждого имени в списке в порядке их перечисления.

В этом analyze примере возвращается один выходной столбец для каждого слова в аргументе входной строки.

@udtf
class MyUDTF:
  @staticmethod
  def analyze(text: AnalyzeArgument) -> AnalyzeResult:
    schema = StructType()
    for index, word in enumerate(sorted(list(set(text.value.split(" "))))):
      schema = schema.add(f"word_{index}", IntegerType())
    return AnalyzeResult(schema=schema)

  def eval(self, text: str):
    counts = {}
    for word in text.split(" "):
      if word not in counts:
            counts[word] = 0
      counts[word] += 1
    result = []
    for word in sorted(list(set(text.split(" ")))):
      result.append(counts[word])
    yield result
['word_0', 'word_1']

Переадресация состояния в будущие вызовы eval

Метод analyze может служить удобным местом для выполнения инициализации, а затем пересылать результаты в будущие вызовы метода eval для того же вызова UDTF.

Для этого создайте подкласс AnalyzeResult и верните экземпляр подкласса из метода analyze. Затем добавьте дополнительный аргумент в метод __init__, чтобы принять этот экземпляр.

Этот analyze пример возвращает константную выходную схему, но добавляет пользовательские сведения в метаданные результатов, которые будут использоваться будущими вызовами методов __init__:

@dataclass
class AnalyzeResultWithBuffer(AnalyzeResult):
    buffer: str = ""

@udtf
class TestUDTF:
  def __init__(self, analyze_result=None):
    self._total = 0
    if analyze_result is not None:
      self._buffer = analyze_result.buffer
    else:
      self._buffer = ""

  @staticmethod
  def analyze(argument, _) -> AnalyzeResult:
    if (
      argument.value is None
      or argument.isTable
      or not isinstance(argument.value, str)
      or len(argument.value) == 0
    ):
      raise Exception("The first argument must be a non-empty string")
    assert argument.dataType == StringType()
    assert not argument.isTable
    return AnalyzeResultWithBuffer(
      schema=StructType()
        .add("total", IntegerType())
        .add("buffer", StringType()),
      withSinglePartition=True,
      buffer=argument.value,
    )

  def eval(self, argument, row: Row):
    self._total += 1

  def terminate(self):
    yield self._total, self._buffer

self.spark.udtf.register("test_udtf", TestUDTF)

spark.sql(
  """
  WITH t AS (
    SELECT id FROM range(1, 21)
  )
  SELECT total, buffer
  FROM test_udtf("abc", TABLE(t))
  """
).show()
+-------+-------+
| count | buffer|
+-------+-------+
|    20 |  "abc"|
+-------+-------+

Генерация строк вывода

Метод eval выполняется один раз для каждой строки входного аргумента таблицы (или только один раз, если аргумент таблицы не указан), за которым следует один вызов метода terminate в конце. Любой из методов выводит ноль или больше строк, которые соответствуют схеме результатов, генерируя кортежи, списки или объекты типа pyspark.sql.Row.

В этом примере возвращается строка путем предоставления кортежа из трех элементов.

def eval(self, x, y, z):
  yield (x, y, z)

Можно также опустить скобки:

def eval(self, x, y, z):
  yield x, y, z

Добавьте конечную запятую, чтобы вернуть строку только с одним столбцом:

def eval(self, x, y, z):
  yield x,

Вы также можете получить объект pyspark.sql.Row.

def eval(self, x, y, z)
  from pyspark.sql.types import Row
  yield Row(x, y, z)

В этом примере метод terminate генерирует выходные строки с использованием списка Python. Вы можете для этой цели хранить состояние внутри класса из предыдущих шагов при оценке UDTF.

def terminate(self):
  yield [self.x, self.y, self.z]

Передавайте скалярные аргументы в UDTF

Скалярные аргументы можно передать в UDTF в виде константных выражений, состоящих из литеральных значений или функций на основе них. Например:

SELECT * FROM udtf(42, group => upper("finance_department"));

Передача аргументов таблицы в UDTF

Определяемые пользователем табличные функции (UDTFs) в Python могут принимать входную таблицу в качестве аргумента в дополнение к скалярным входным аргументам. Один UDTF также может принимать аргумент таблицы и несколько скалярных аргументов.

Затем любой ЗАПРОС SQL может предоставить входную таблицу с помощью ключевого слова TABLE, за которым следует скобки, окружающие соответствующий идентификатор таблицы, например TABLE(t). Кроме того, можно передать подзапрос таблицы, например TABLE(SELECT a, b, c FROM t) или TABLE(SELECT t1.a, t2.b FROM t1 INNER JOIN t2 USING (key)).

Затем входной аргумент таблицы представляется в виде аргумента pyspark.sql.Row метода eval с одним вызовом метода eval для каждой строки в входной таблице. Для взаимодействия со столбцами в каждой строке можно использовать стандартные заметки полей столбцов PySpark. В следующем примере показано явное импортирование типа PySpark Row, а затем фильтрация переданной таблицы в поле id:

from pyspark.sql.functions import udtf
from pyspark.sql.types import Row

@udtf(returnType="id: int")
class FilterUDTF:
    def eval(self, row: Row):
        if row["id"] > 5:
            yield row["id"],

spark.udtf.register("filter_udtf", FilterUDTF)

Чтобы запросить функцию, используйте ключевое слово TABLE SQL:

SELECT * FROM filter_udtf(TABLE(SELECT * FROM range(10)));
+---+
| id|
+---+
|  6|
|  7|
|  8|
|  9|
+---+

Укажите метод секционирования входных строк, полученных из вызовов функций.

При вызове UDTF с аргументом таблицы любой SQL-запрос может выполнять секционирование входной таблицы в нескольких вызовах UDTF на основе значений одного или нескольких столбцов входной таблицы.

Чтобы указать секцию, используйте предложение PARTITION BY в вызове функции после аргумента TABLE. Это гарантирует, что все входные строки с каждым уникальным сочетанием значений столбцов секционирования будут использоваться ровно одним экземпляром класса UDTF.

Обратите внимание, что помимо простых ссылок на столбцы, предложение PARTITION BY также принимает произвольные выражения на основе входных столбцов таблицы. Например, можно указать LENGTH строки, извлечь месяц из даты или объединить два значения.

Кроме того, можно указать WITH SINGLE PARTITION вместо PARTITION BY, чтобы запрашивать только один раздел, в котором все входные строки должны быть обработаны ровно одним экземпляром класса UDTF.

В каждой секции при желании можно указать порядок входных строк, поскольку метод eval UDTF использует их. Для этого предоставьте предложение ORDER BY после предложения PARTITION BY или WITH SINGLE PARTITION, описанного выше.

Например, рассмотрим следующий UDTF:

from pyspark.sql.functions import udtf
from pyspark.sql.types import Row

@udtf(returnType="a: string, b: int")
class FilterUDTF:
  def __init__(self):
    self.key = ""
    self.max = 0

  def eval(self, row: Row):
    self.key = row["a"]
    self.max = max(self.max, row["b"])

  def terminate(self):
    yield self.key, self.max

spark.udtf.register("filter_udtf", FilterUDTF)

Параметры секционирования можно указать при вызове UDTF по входной таблице несколькими способами.

-- Create an input table with some example values.
DROP TABLE IF EXISTS values_table;
CREATE TABLE values_table (a STRING, b INT);
INSERT INTO values_table VALUES ('abc', 2), ('abc', 4), ('def', 6), ('def', 8)";
SELECT * FROM values_table;
+-------+----+
|     a |  b |
+-------+----+
| "abc" | 2  |
| "abc" | 4  |
| "def" | 6  |
| "def" | 8  |
+-------+----+
-- Query the UDTF with the input table as an argument and a directive to partition the input
-- rows such that all rows with each unique value in the `a` column are processed by the same
-- instance of the UDTF class. Within each partition, the rows are ordered by the `b` column.
SELECT * FROM filter_udtf(TABLE(values_table) PARTITION BY a ORDER BY b) ORDER BY 1;
+-------+----+
|     a |  b |
+-------+----+
| "abc" | 4  |
| "def" | 8  |
+-------+----+

-- Query the UDTF with the input table as an argument and a directive to partition the input
-- rows such that all rows with each unique result of evaluating the "LENGTH(a)" expression are
-- processed by the same instance of the UDTF class. Within each partition, the rows are ordered
-- by the `b` column.
SELECT * FROM filter_udtf(TABLE(values_table) PARTITION BY LENGTH(a) ORDER BY b) ORDER BY 1;
+-------+---+
|     a | b |
+-------+---+
| "def" | 8 |
+-------+---+
-- Query the UDTF with the input table as an argument and a directive to consider all the input
-- rows in one single partition such that exactly one instance of the UDTF class consumes all of
-- the input rows. Within each partition, the rows are ordered by the `b` column.
SELECT * FROM filter_udtf(TABLE(values_table) WITH SINGLE PARTITION ORDER BY b) ORDER BY 1;
+-------+----+
|     a |  b |
+-------+----+
| "def" | 8 |
+-------+----+

Укажите секционирование входных строк из метода analyze

Обратите внимание, что для каждого из указанных выше способов разбиения входной таблицы при вызове табличных функций, определяемых пользователями, в SQL-запросах существует соответствующий способ для метода UDTF analyze, чтобы автоматически определить тот же способ разбиения.

  • Вместо вызова UDTF как SELECT * FROM udtf(TABLE(t) PARTITION BY a)можно обновить метод analyze, чтобы задать поле partitionBy=[PartitioningColumn("a")] и просто вызвать функцию с помощью SELECT * FROM udtf(TABLE(t)).
  • Тем же образом, вместо указания TABLE(t) WITH SINGLE PARTITION ORDER BY b в SQL-запросе, можно настроить analyze так, чтобы он задавал поля withSinglePartition=true и orderBy=[OrderingColumn("b")], а затем просто передать TABLE(t).
  • Вместо передачи TABLE(SELECT a FROM t) в SQL-запросе, можно сделать так, чтобы analyze установил select=[SelectedColumn("a")], а затем просто передать TABLE(t).

В следующем примере analyze возвращает константную выходную схему, выбирает подмножество столбцов из входной таблицы и указывает, что входная таблица секционирована по нескольким вызовам UDTF на основе значений столбца date:

@staticmethod
def analyze(*args) -> AnalyzeResult:
  """
  The input table will be partitioned across several UDTF calls based on the monthly
  values of each `date` column. The rows within each partition will arrive ordered by the `date`
  column. The UDTF will only receive the `date` and `word` columns from the input table.
  """
  from pyspark.sql.functions import (
    AnalyzeResult,
    OrderingColumn,
    PartitioningColumn,
  )

  assert len(args) == 1, "This function accepts one argument only"
  assert args[0].isTable, "Only table arguments are supported"
  return AnalyzeResult(
    schema=StructType()
      .add("month", DateType())
      .add('longest_word", IntegerType()),
    partitionBy=[
      PartitioningColumn("extract(month from date)")],
    orderBy=[
      OrderingColumn("date")],
    select=[
      SelectedColumn("date"),
      SelectedColumn(
        name="length(word),
        alias="length_word")])