Compartir a través de


Funciones de tabla definidas por el usuario de Python (UDTF)

Importante

Esta característica está en versión preliminar pública en Databricks Runtime 14.3 LTS y versiones posteriores.

Las funciones de tabla definidas por un usuario (UDTF) le permiten registrar funciones que devuelven tablas en lugar de valores escalares. A diferencia de las funciones escalares que devuelven un único valor de resultado a partir de cada llamada, cada UDTF se invoca en una cláusula FROM de una instrucción SQL y devuelve una tabla completa como salida.

Cada llamada UDTF puede aceptar cero o más argumentos. Estos argumentos pueden ser expresiones escalares o argumentos de tabla que representan tablas de entrada completas.

Sintaxis UDTF básica

Apache Spark implementa las UDTF de Python como clases de Python con un método obligatorio eval que usa yield para emitir filas de salida.

Para usar la clase como una UDTF, debe importar la función PySpark udtf. Databricks recomienda usar esta función como decorador y especificar explícitamente los nombres y tipos de campo mediante la opción returnType (a menos que la clase defina un método analyze como se describe en una sección posterior).

La siguiente UDTF crea una tabla con una lista fija de dos argumentos enteros:

from pyspark.sql.functions import lit, udtf

@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
    def eval(self, x: int, y: int):
        yield x + y, x - y

GetSumDiff(lit(1), lit(2)).show()
+----+-----+
| sum| diff|
+----+-----+
|   3|   -1|
+----+-----+

Registro de una UDTF

Las UDTF se registran en SparkSession local y están aisladas en el nivel de cuaderno o trabajo.

No se pueden registrar UDTF como objetos en Unity Catalog y las UDTF no se pueden usar con almacenes de SQL.

Puede registrar una UDTF en el objeto actual SparkSession para su uso en consultas SQL con la función spark.udtf.register(). Proporcione un nombre para la función SQL y la clase UDTF de Python.

spark.udtf.register("get_sum_diff", GetSumDiff)

Llamada a una UDTF registrada

Una vez registrado, puede usar la UDTF en SQL mediante el comando magic %sql o la función spark.sql():

spark.udtf.register("get_sum_diff", GetSumDiff)
spark.sql("SELECT * FROM get_sum_diff(1,2);")
%sql
SELECT * FROM get_sum_diff(1,2);

Usar Apache Arrow

Si la UDTF recibe una pequeña cantidad de datos como entrada, pero genera una tabla grande, Databricks recomienda usar Apache Arrow. Puede habilitarlo especificando el parámetro useArrow al declarar la UDTF:

@udtf(returnType="c1: int, c2: int", useArrow=True)

Listas de argumentos variables: *args y **kwargs

Puede usar la sintaxis de Python *args o **kwargs e implementar lógica para controlar un número no especificado de valores de entrada.

En el ejemplo siguiente se devuelve el mismo resultado al comprobar explícitamente la longitud de entrada y los tipos de los argumentos:

@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
    def eval(self, *args):
        assert(len(args) == 2)
        assert(isinstance(arg, int) for arg in args)
        x = args[0]
        y = args[1]
        yield x + y, x - y

GetSumDiff(lit(1), lit(2)).show()

Este es el mismo ejemplo, pero usando argumentos de palabra clave:

@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
    def eval(self, **kwargs):
        x = kwargs["x"]
        y = kwargs["y"]
        yield x + y, x - y

GetSumDiff(x=lit(1), y=lit(2)).show()

Definir un esquema estático en el momento del registro

La UDTF devuelve filas con un esquema de salida que comprende una secuencia ordenada de tipos y nombres de columna. Si el esquema UDTF siempre debe permanecer igual para todas las consultas, puede especificar un esquema fijo estático después del decorador @udtf. Debe ser : StructType

StructType().add("c1", StringType())

O una cadena DDL que representa un tipo de estructura:

c1: string

Calcular un esquema dinámico en el momento de la llamada de función

Las UDF también pueden calcular el esquema de salida mediante programación para cada llamada en función de los valores de los argumentos de entrada. Para ello, defina un método estático denominado analyze que acepte cero o más parámetros que correspondan a los argumentos proporcionados a la llamada UDTF específica.

Cada argumento del método analyze es una instancia de la clase AnalyzeArgument que contiene los campos siguientes:

Campo de clase AnalyzeArgument Descripción
dataType El tipo del argumento de entrada como DataType. Para los argumentos de la tabla de entrada, se trata de un StructType que representa las columnas de la tabla.
value Valor del argumento de entrada como Optional[Any]. Esto es None para argumentos de tabla o argumentos escalares literales que no son constantes.
isTable Si el argumento de entrada es una tabla como BooleanType.
isConstantExpression Si el argumento de entrada es una expresión plegable constante como BooleanType.

El método analyze devuelve una instancia de la clase AnalyzeResult, que incluye el esquema de la tabla de resultados como un StructType más algunos campos opcionales. Si la UDTF acepta un argumento de tabla de entrada, AnalyzeResult también puede incluir una manera solicitada de particionar y ordenar las filas de la tabla de entrada a través de varias llamadas a la UDTF, como se describe más adelante.

Campo de clase AnalyzeResult Descripción
schema Esquema de la tabla de resultados como StructType.
withSinglePartition Indica si se deben enviar todas las filas de entrada a la misma instancia de clase UDTF que BooleanType.
partitionBy Si se establece en no vacío, todas las filas con cada combinación única de valores de las expresiones de partición se consumen mediante una instancia independiente de la clase UDTF.
orderBy Si se establece en no vacío, especifica una ordenación de filas dentro de cada partición.
select Si se establece en no vacío, se trata de una secuencia de expresiones que la UDTF especifica para que Catalyst se evalúe con las columnas del argumento TABLE de entrada. La UDTF recibe un atributo de entrada para cada nombre de la lista en el orden en que se enumeran.

Este ejemplo analyze devuelve una columna de salida para cada palabra del argumento de la cadena de entrada.

@udtf
class MyUDTF:
  @staticmethod
  def analyze(text: AnalyzeArgument) -> AnalyzeResult:
    schema = StructType()
    for index, word in enumerate(sorted(list(set(text.value.split(" "))))):
      schema = schema.add(f"word_{index}", IntegerType())
    return AnalyzeResult(schema=schema)

  def eval(self, text: str):
    counts = {}
    for word in text.split(" "):
      if word not in counts:
            counts[word] = 0
      counts[word] += 1
    result = []
    for word in sorted(list(set(text.split(" ")))):
      result.append(counts[word])
    yield result
['word_0', 'word_1']

Desviar el estado a llamadas futuras eval

El método analyze puede servir como un lugar conveniente para realizar la inicialización y, a continuación, reenviar los resultados a futuras invocaciones de método eval para la misma llamada UDTF.

Para ello, cree una subclase de AnalyzeResult y devuelva una instancia de la subclase desde el método analyze. A continuación, agregue un argumento adicional al método __init__ para aceptar esa instancia.

Este ejemplo analyze devuelve un esquema de salida constante, pero se agrega información personalizada en los metadatos de resultado que las llamadas de método futuras __init__ consumen:

@dataclass
class AnalyzeResultWithBuffer(AnalyzeResult):
    buffer: str = ""

@udtf
class TestUDTF:
  def __init__(self, analyze_result=None):
    self._total = 0
    if analyze_result is not None:
      self._buffer = analyze_result.buffer
    else:
      self._buffer = ""

  @staticmethod
  def analyze(argument, _) -> AnalyzeResult:
    if (
      argument.value is None
      or argument.isTable
      or not isinstance(argument.value, str)
      or len(argument.value) == 0
    ):
      raise Exception("The first argument must be a non-empty string")
    assert argument.dataType == StringType()
    assert not argument.isTable
    return AnalyzeResultWithBuffer(
      schema=StructType()
        .add("total", IntegerType())
        .add("buffer", StringType()),
      withSinglePartition=True,
      buffer=argument.value,
    )

  def eval(self, argument, row: Row):
    self._total += 1

  def terminate(self):
    yield self._total, self._buffer

self.spark.udtf.register("test_udtf", TestUDTF)

spark.sql(
  """
  WITH t AS (
    SELECT id FROM range(1, 21)
  )
  SELECT total, buffer
  FROM test_udtf("abc", TABLE(t))
  """
).show()
+-------+-------+
| count | buffer|
+-------+-------+
|    20 |  "abc"|
+-------+-------+

Suspender filas de salida

El método eval se ejecuta una vez para cada fila del argumento de tabla de entrada (o simplemente una vez si no se proporciona ningún argumento de tabla), seguido de una invocación del método terminate al final. El método genera cero o más filas que se ajustan al esquema de resultados al suspender tuplas, listas o objetos pyspark.sql.Row.

En este ejemplo se devuelve una fila proporcionando una tupla de tres elementos:

def eval(self, x, y, z):
  yield (x, y, z)

También puede omitir los paréntesis:

def eval(self, x, y, z):
  yield x, y, z

Agregue una coma final para devolver una fila con una sola columna:

def eval(self, x, y, z):
  yield x,

También puede producir un objeto pyspark.sql.Row.

def eval(self, x, y, z)
  from pyspark.sql.types import Row
  yield Row(x, y, z)

En este ejemplo se generan filas de salida del método terminate mediante una lista de Python. Puede almacenar el estado dentro de la clase a partir de pasos anteriores de la evaluación UDTF para este propósito.

def terminate(self):
  yield [self.x, self.y, self.z]

Pasar argumentos escalares a una UDTF

Puede pasar argumentos escalares a una UDTF como expresiones constantes que comprenden valores literales o funciones basadas en ellos. Por ejemplo:

SELECT * FROM udtf(42, group => upper("finance_department"));

Pasar argumentos de tabla a una UDTF

Las UDF de Python pueden aceptar una tabla de entrada como argumento además de argumentos de entrada escalares. Un único UDTF también puede aceptar un argumento de tabla y varios argumentos escalares.

A continuación, cualquier consulta SQL puede proporcionar una tabla de entrada mediante la palabra clave TABLE seguida de un paréntesis alrededor de un identificador de tabla adecuado, como TABLE(t). Como alternativa, puede pasar una subconsulta de tabla, como TABLE(SELECT a, b, c FROM t) o TABLE(SELECT t1.a, t2.b FROM t1 INNER JOIN t2 USING (key)).

A continuación, el argumento de tabla de entrada se representa como un argumento pyspark.sql.Row al método eval, con una llamada al método eval para cada fila de la tabla de entrada. Puede usar anotaciones de campo de columna PySpark estándar para interactuar con columnas de cada fila. En el ejemplo siguiente se muestra cómo importar explícitamente el tipo PySpark Row y, a continuación, filtrar la tabla pasada en el campo id:

from pyspark.sql.functions import udtf
from pyspark.sql.types import Row

@udtf(returnType="id: int")
class FilterUDTF:
    def eval(self, row: Row):
        if row["id"] > 5:
            yield row["id"],

spark.udtf.register("filter_udtf", FilterUDTF)

Para consultar la función, use la palabra clave SQL TABLE:

SELECT * FROM filter_udtf(TABLE(SELECT * FROM range(10)));
+---+
| id|
+---+
|  6|
|  7|
|  8|
|  9|
+---+

Especificar una creación de particiones de las filas de entrada a partir de las llamadas de función

Al llamar a una UDTF con un argumento de tabla, cualquier consulta SQL puede particionar la tabla de entrada en varias llamadas UDTF basadas en los valores de una o varias columnas de tabla de entrada.

Para especificar una partición, use la cláusula PARTITION BY en la llamada de función después del argumento TABLE. Esto garantiza el consumo de todas las filas de entrada con cada combinación única de valores de las columnas de partición por parte de una sola instancia de la clase UDTF.

Tenga en cuenta que, además de las referencias de columna simples, la cláusula PARTITION BY también acepta expresiones arbitrarias basadas en columnas de tabla de entrada. Por ejemplo, puede especificar el LENGTH de una cadena, extraer un mes de una fecha o concatenar dos valores.

También es posible especificar WITH SINGLE PARTITION en lugar de PARTITION BY para solicitar solo una partición donde una sola instancia de la clase UDTF debe consumir todas las filas de entrada.

Dentro de cada partición, puede especificar opcionalmente una ordenación obligatoria de las filas de entrada a medida que el método eval de la UDTF las consume. Para ello, proporcione una cláusula ORDER BY después de las cláusulas PARTITION BY o WITH SINGLE PARTITION descritas anteriormente.

Por ejemplo, considere la siguiente UDTF:

from pyspark.sql.functions import udtf
from pyspark.sql.types import Row

@udtf(returnType="a: string, b: int")
class FilterUDTF:
  def __init__(self):
    self.key = ""
    self.max = 0

  def eval(self, row: Row):
    self.key = row["a"]
    self.max = max(self.max, row["b"])

  def terminate(self):
    yield self.key, self.max

spark.udtf.register("filter_udtf", FilterUDTF)

Puede especificar las opciones de creación de particiones al llamar a la UDTF sobre la tabla de entrada de múltiples maneras:

-- Create an input table with some example values.
DROP TABLE IF EXISTS values_table;
CREATE TABLE values_table (a STRING, b INT);
INSERT INTO values_table VALUES ('abc', 2), ('abc', 4), ('def', 6), ('def', 8)";
SELECT * FROM values_table;
+-------+----+
|     a |  b |
+-------+----+
| "abc" | 2  |
| "abc" | 4  |
| "def" | 6  |
| "def" | 8  |
+-------+----+
-- Query the UDTF with the input table as an argument and a directive to partition the input
-- rows such that all rows with each unique value in the `a` column are processed by the same
-- instance of the UDTF class. Within each partition, the rows are ordered by the `b` column.
SELECT * FROM filter_udtf(TABLE(values_table) PARTITION BY a ORDER BY b) ORDER BY 1;
+-------+----+
|     a |  b |
+-------+----+
| "abc" | 4  |
| "def" | 8  |
+-------+----+

-- Query the UDTF with the input table as an argument and a directive to partition the input
-- rows such that all rows with each unique result of evaluating the "LENGTH(a)" expression are
-- processed by the same instance of the UDTF class. Within each partition, the rows are ordered
-- by the `b` column.
SELECT * FROM filter_udtf(TABLE(values_table) PARTITION BY LENGTH(a) ORDER BY b) ORDER BY 1;
+-------+---+
|     a | b |
+-------+---+
| "def" | 8 |
+-------+---+
-- Query the UDTF with the input table as an argument and a directive to consider all the input
-- rows in one single partition such that exactly one instance of the UDTF class consumes all of
-- the input rows. Within each partition, the rows are ordered by the `b` column.
SELECT * FROM filter_udtf(TABLE(values_table) WITH SINGLE PARTITION ORDER BY b) ORDER BY 1;
+-------+----+
|     a |  b |
+-------+----+
| "def" | 8 |
+-------+----+

Especificar una creación de particiones de las filas de entrada del método analyze

Tenga en cuenta que, para cada una de las formas anteriores de crear particiones de la tabla de entrada al llamar a UDTF en consultas SQL, hay una forma correspondiente para que el método analyze de la UDTF especifique automáticamente el mismo método de creación de particiones.

  • En lugar de llamar a un UDTF como SELECT * FROM udtf(TABLE(t) PARTITION BY a), puede actualizar el método analyze para establecer el campo partitionBy=[PartitioningColumn("a")] y simplemente llamar a la función mediante SELECT * FROM udtf(TABLE(t)).
  • Por el mismo token, en lugar de especificar TABLE(t) WITH SINGLE PARTITION ORDER BY b en la consulta SQL, puede establecer analyze los campos withSinglePartition=true y orderBy=[OrderingColumn("b")] luego, simplemente pasar TABLE(t).
  • En lugar de pasar TABLE(SELECT a FROM t) en la consulta SQL, puede hacer que analyze defina select=[SelectedColumn("a")] y, a continuación, simplemente pasar TABLE(t).

En el ejemplo siguiente, analyze devuelve un esquema de salida constante, selecciona un subconjunto de columnas de la tabla de entrada y especifica que la tabla de entrada se particiona en varias llamadas UDTF en función de los valores de la columna date:

@staticmethod
def analyze(*args) -> AnalyzeResult:
  """
  The input table will be partitioned across several UDTF calls based on the monthly
  values of each `date` column. The rows within each partition will arrive ordered by the `date`
  column. The UDTF will only receive the `date` and `word` columns from the input table.
  """
  from pyspark.sql.functions import (
    AnalyzeResult,
    OrderingColumn,
    PartitioningColumn,
  )

  assert len(args) == 1, "This function accepts one argument only"
  assert args[0].isTable, "Only table arguments are supported"
  return AnalyzeResult(
    schema=StructType()
      .add("month", DateType())
      .add('longest_word", IntegerType()),
    partitionBy=[
      PartitioningColumn("extract(month from date)")],
    orderBy=[
      OrderingColumn("date")],
    select=[
      SelectedColumn("date"),
      SelectedColumn(
        name="length(word),
        alias="length_word")])