Funciones de tabla definidas por el usuario de Python (UDTF)
Importante
Esta característica está en versión preliminar pública en Databricks Runtime 14.3 LTS y versiones posteriores.
Las funciones de tabla definidas por un usuario (UDTF) le permiten registrar funciones que devuelven tablas en lugar de valores escalares. A diferencia de las funciones escalares que devuelven un único valor de resultado a partir de cada llamada, cada UDTF se invoca en una cláusula FROM
de una instrucción SQL y devuelve una tabla completa como salida.
Cada llamada UDTF puede aceptar cero o más argumentos. Estos argumentos pueden ser expresiones escalares o argumentos de tabla que representan tablas de entrada completas.
Sintaxis UDTF básica
Apache Spark implementa las UDTF de Python como clases de Python con un método obligatorio eval
que usa yield
para emitir filas de salida.
Para usar la clase como una UDTF, debe importar la función PySpark udtf
. Databricks recomienda usar esta función como decorador y especificar explícitamente los nombres y tipos de campo mediante la opción returnType
(a menos que la clase defina un método analyze
como se describe en una sección posterior).
La siguiente UDTF crea una tabla con una lista fija de dos argumentos enteros:
from pyspark.sql.functions import lit, udtf
@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
def eval(self, x: int, y: int):
yield x + y, x - y
GetSumDiff(lit(1), lit(2)).show()
+----+-----+
| sum| diff|
+----+-----+
| 3| -1|
+----+-----+
Registro de una UDTF
Las UDTF se registran en SparkSession
local y están aisladas en el nivel de cuaderno o trabajo.
No se pueden registrar UDTF como objetos en Unity Catalog y las UDTF no se pueden usar con almacenes de SQL.
Puede registrar una UDTF en el objeto actual SparkSession
para su uso en consultas SQL con la función spark.udtf.register()
. Proporcione un nombre para la función SQL y la clase UDTF de Python.
spark.udtf.register("get_sum_diff", GetSumDiff)
Llamada a una UDTF registrada
Una vez registrado, puede usar la UDTF en SQL mediante el comando magic %sql
o la función spark.sql()
:
spark.udtf.register("get_sum_diff", GetSumDiff)
spark.sql("SELECT * FROM get_sum_diff(1,2);")
%sql
SELECT * FROM get_sum_diff(1,2);
Usar Apache Arrow
Si la UDTF recibe una pequeña cantidad de datos como entrada, pero genera una tabla grande, Databricks recomienda usar Apache Arrow. Puede habilitarlo especificando el parámetro useArrow
al declarar la UDTF:
@udtf(returnType="c1: int, c2: int", useArrow=True)
Listas de argumentos variables: *args y **kwargs
Puede usar la sintaxis de Python *args
o **kwargs
e implementar lógica para controlar un número no especificado de valores de entrada.
En el ejemplo siguiente se devuelve el mismo resultado al comprobar explícitamente la longitud de entrada y los tipos de los argumentos:
@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
def eval(self, *args):
assert(len(args) == 2)
assert(isinstance(arg, int) for arg in args)
x = args[0]
y = args[1]
yield x + y, x - y
GetSumDiff(lit(1), lit(2)).show()
Este es el mismo ejemplo, pero usando argumentos de palabra clave:
@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
def eval(self, **kwargs):
x = kwargs["x"]
y = kwargs["y"]
yield x + y, x - y
GetSumDiff(x=lit(1), y=lit(2)).show()
Definir un esquema estático en el momento del registro
La UDTF devuelve filas con un esquema de salida que comprende una secuencia ordenada de tipos y nombres de columna. Si el esquema UDTF siempre debe permanecer igual para todas las consultas, puede especificar un esquema fijo estático después del decorador @udtf
. Debe ser : StructType
StructType().add("c1", StringType())
O una cadena DDL que representa un tipo de estructura:
c1: string
Calcular un esquema dinámico en el momento de la llamada de función
Las UDF también pueden calcular el esquema de salida mediante programación para cada llamada en función de los valores de los argumentos de entrada. Para ello, defina un método estático denominado analyze
que acepte cero o más parámetros que correspondan a los argumentos proporcionados a la llamada UDTF específica.
Cada argumento del método analyze
es una instancia de la clase AnalyzeArgument
que contiene los campos siguientes:
Campo de clase AnalyzeArgument |
Descripción |
---|---|
dataType |
El tipo del argumento de entrada como DataType . Para los argumentos de la tabla de entrada, se trata de un StructType que representa las columnas de la tabla. |
value |
Valor del argumento de entrada como Optional[Any] . Esto es None para argumentos de tabla o argumentos escalares literales que no son constantes. |
isTable |
Si el argumento de entrada es una tabla como BooleanType . |
isConstantExpression |
Si el argumento de entrada es una expresión plegable constante como BooleanType . |
El método analyze
devuelve una instancia de la clase AnalyzeResult
, que incluye el esquema de la tabla de resultados como un StructType
más algunos campos opcionales. Si la UDTF acepta un argumento de tabla de entrada, AnalyzeResult
también puede incluir una manera solicitada de particionar y ordenar las filas de la tabla de entrada a través de varias llamadas a la UDTF, como se describe más adelante.
Campo de clase AnalyzeResult |
Descripción |
---|---|
schema |
Esquema de la tabla de resultados como StructType . |
withSinglePartition |
Indica si se deben enviar todas las filas de entrada a la misma instancia de clase UDTF que BooleanType . |
partitionBy |
Si se establece en no vacío, todas las filas con cada combinación única de valores de las expresiones de partición se consumen mediante una instancia independiente de la clase UDTF. |
orderBy |
Si se establece en no vacío, especifica una ordenación de filas dentro de cada partición. |
select |
Si se establece en no vacío, se trata de una secuencia de expresiones que la UDTF especifica para que Catalyst se evalúe con las columnas del argumento TABLE de entrada. La UDTF recibe un atributo de entrada para cada nombre de la lista en el orden en que se enumeran. |
Este ejemplo analyze
devuelve una columna de salida para cada palabra del argumento de la cadena de entrada.
@udtf
class MyUDTF:
@staticmethod
def analyze(text: AnalyzeArgument) -> AnalyzeResult:
schema = StructType()
for index, word in enumerate(sorted(list(set(text.value.split(" "))))):
schema = schema.add(f"word_{index}", IntegerType())
return AnalyzeResult(schema=schema)
def eval(self, text: str):
counts = {}
for word in text.split(" "):
if word not in counts:
counts[word] = 0
counts[word] += 1
result = []
for word in sorted(list(set(text.split(" ")))):
result.append(counts[word])
yield result
['word_0', 'word_1']
Desviar el estado a llamadas futuras eval
El método analyze
puede servir como un lugar conveniente para realizar la inicialización y, a continuación, reenviar los resultados a futuras invocaciones de método eval
para la misma llamada UDTF.
Para ello, cree una subclase de AnalyzeResult
y devuelva una instancia de la subclase desde el método analyze
.
A continuación, agregue un argumento adicional al método __init__
para aceptar esa instancia.
Este ejemplo analyze
devuelve un esquema de salida constante, pero se agrega información personalizada en los metadatos de resultado que las llamadas de método futuras __init__
consumen:
@dataclass
class AnalyzeResultWithBuffer(AnalyzeResult):
buffer: str = ""
@udtf
class TestUDTF:
def __init__(self, analyze_result=None):
self._total = 0
if analyze_result is not None:
self._buffer = analyze_result.buffer
else:
self._buffer = ""
@staticmethod
def analyze(argument, _) -> AnalyzeResult:
if (
argument.value is None
or argument.isTable
or not isinstance(argument.value, str)
or len(argument.value) == 0
):
raise Exception("The first argument must be a non-empty string")
assert argument.dataType == StringType()
assert not argument.isTable
return AnalyzeResultWithBuffer(
schema=StructType()
.add("total", IntegerType())
.add("buffer", StringType()),
withSinglePartition=True,
buffer=argument.value,
)
def eval(self, argument, row: Row):
self._total += 1
def terminate(self):
yield self._total, self._buffer
self.spark.udtf.register("test_udtf", TestUDTF)
spark.sql(
"""
WITH t AS (
SELECT id FROM range(1, 21)
)
SELECT total, buffer
FROM test_udtf("abc", TABLE(t))
"""
).show()
+-------+-------+
| count | buffer|
+-------+-------+
| 20 | "abc"|
+-------+-------+
Suspender filas de salida
El método eval
se ejecuta una vez para cada fila del argumento de tabla de entrada (o simplemente una vez si no se proporciona ningún argumento de tabla), seguido de una invocación del método terminate
al final. El método genera cero o más filas que se ajustan al esquema de resultados al suspender tuplas, listas o objetos pyspark.sql.Row
.
En este ejemplo se devuelve una fila proporcionando una tupla de tres elementos:
def eval(self, x, y, z):
yield (x, y, z)
También puede omitir los paréntesis:
def eval(self, x, y, z):
yield x, y, z
Agregue una coma final para devolver una fila con una sola columna:
def eval(self, x, y, z):
yield x,
También puede producir un objeto pyspark.sql.Row
.
def eval(self, x, y, z)
from pyspark.sql.types import Row
yield Row(x, y, z)
En este ejemplo se generan filas de salida del método terminate
mediante una lista de Python. Puede almacenar el estado dentro de la clase a partir de pasos anteriores de la evaluación UDTF para este propósito.
def terminate(self):
yield [self.x, self.y, self.z]
Pasar argumentos escalares a una UDTF
Puede pasar argumentos escalares a una UDTF como expresiones constantes que comprenden valores literales o funciones basadas en ellos. Por ejemplo:
SELECT * FROM udtf(42, group => upper("finance_department"));
Pasar argumentos de tabla a una UDTF
Las UDF de Python pueden aceptar una tabla de entrada como argumento además de argumentos de entrada escalares. Un único UDTF también puede aceptar un argumento de tabla y varios argumentos escalares.
A continuación, cualquier consulta SQL puede proporcionar una tabla de entrada mediante la palabra clave TABLE
seguida de un paréntesis alrededor de un identificador de tabla adecuado, como TABLE(t)
. Como alternativa, puede pasar una subconsulta de tabla, como TABLE(SELECT a, b, c FROM t)
o TABLE(SELECT t1.a, t2.b FROM t1 INNER JOIN t2 USING (key))
.
A continuación, el argumento de tabla de entrada se representa como un argumento pyspark.sql.Row
al método eval
, con una llamada al método eval
para cada fila de la tabla de entrada. Puede usar anotaciones de campo de columna PySpark estándar para interactuar con columnas de cada fila. En el ejemplo siguiente se muestra cómo importar explícitamente el tipo PySpark Row
y, a continuación, filtrar la tabla pasada en el campo id
:
from pyspark.sql.functions import udtf
from pyspark.sql.types import Row
@udtf(returnType="id: int")
class FilterUDTF:
def eval(self, row: Row):
if row["id"] > 5:
yield row["id"],
spark.udtf.register("filter_udtf", FilterUDTF)
Para consultar la función, use la palabra clave SQL TABLE
:
SELECT * FROM filter_udtf(TABLE(SELECT * FROM range(10)));
+---+
| id|
+---+
| 6|
| 7|
| 8|
| 9|
+---+
Especificar una creación de particiones de las filas de entrada a partir de las llamadas de función
Al llamar a una UDTF con un argumento de tabla, cualquier consulta SQL puede particionar la tabla de entrada en varias llamadas UDTF basadas en los valores de una o varias columnas de tabla de entrada.
Para especificar una partición, use la cláusula PARTITION BY
en la llamada de función después del argumento TABLE
.
Esto garantiza el consumo de todas las filas de entrada con cada combinación única de valores de las columnas de partición por parte de una sola instancia de la clase UDTF.
Tenga en cuenta que, además de las referencias de columna simples, la cláusula PARTITION BY
también acepta expresiones arbitrarias basadas en columnas de tabla de entrada. Por ejemplo, puede especificar el LENGTH
de una cadena, extraer un mes de una fecha o concatenar dos valores.
También es posible especificar WITH SINGLE PARTITION
en lugar de PARTITION BY
para solicitar solo una partición donde una sola instancia de la clase UDTF debe consumir todas las filas de entrada.
Dentro de cada partición, puede especificar opcionalmente una ordenación obligatoria de las filas de entrada a medida que el método eval
de la UDTF las consume. Para ello, proporcione una cláusula ORDER BY
después de las cláusulas PARTITION BY
o WITH SINGLE PARTITION
descritas anteriormente.
Por ejemplo, considere la siguiente UDTF:
from pyspark.sql.functions import udtf
from pyspark.sql.types import Row
@udtf(returnType="a: string, b: int")
class FilterUDTF:
def __init__(self):
self.key = ""
self.max = 0
def eval(self, row: Row):
self.key = row["a"]
self.max = max(self.max, row["b"])
def terminate(self):
yield self.key, self.max
spark.udtf.register("filter_udtf", FilterUDTF)
Puede especificar las opciones de creación de particiones al llamar a la UDTF sobre la tabla de entrada de múltiples maneras:
-- Create an input table with some example values.
DROP TABLE IF EXISTS values_table;
CREATE TABLE values_table (a STRING, b INT);
INSERT INTO values_table VALUES ('abc', 2), ('abc', 4), ('def', 6), ('def', 8)";
SELECT * FROM values_table;
+-------+----+
| a | b |
+-------+----+
| "abc" | 2 |
| "abc" | 4 |
| "def" | 6 |
| "def" | 8 |
+-------+----+
-- Query the UDTF with the input table as an argument and a directive to partition the input
-- rows such that all rows with each unique value in the `a` column are processed by the same
-- instance of the UDTF class. Within each partition, the rows are ordered by the `b` column.
SELECT * FROM filter_udtf(TABLE(values_table) PARTITION BY a ORDER BY b) ORDER BY 1;
+-------+----+
| a | b |
+-------+----+
| "abc" | 4 |
| "def" | 8 |
+-------+----+
-- Query the UDTF with the input table as an argument and a directive to partition the input
-- rows such that all rows with each unique result of evaluating the "LENGTH(a)" expression are
-- processed by the same instance of the UDTF class. Within each partition, the rows are ordered
-- by the `b` column.
SELECT * FROM filter_udtf(TABLE(values_table) PARTITION BY LENGTH(a) ORDER BY b) ORDER BY 1;
+-------+---+
| a | b |
+-------+---+
| "def" | 8 |
+-------+---+
-- Query the UDTF with the input table as an argument and a directive to consider all the input
-- rows in one single partition such that exactly one instance of the UDTF class consumes all of
-- the input rows. Within each partition, the rows are ordered by the `b` column.
SELECT * FROM filter_udtf(TABLE(values_table) WITH SINGLE PARTITION ORDER BY b) ORDER BY 1;
+-------+----+
| a | b |
+-------+----+
| "def" | 8 |
+-------+----+
Especificar una creación de particiones de las filas de entrada del método analyze
Tenga en cuenta que, para cada una de las formas anteriores de crear particiones de la tabla de entrada al llamar a UDTF en consultas SQL, hay una forma correspondiente para que el método analyze
de la UDTF especifique automáticamente el mismo método de creación de particiones.
- En lugar de llamar a un UDTF como
SELECT * FROM udtf(TABLE(t) PARTITION BY a)
, puede actualizar el métodoanalyze
para establecer el campopartitionBy=[PartitioningColumn("a")]
y simplemente llamar a la función medianteSELECT * FROM udtf(TABLE(t))
. - Por el mismo token, en lugar de especificar
TABLE(t) WITH SINGLE PARTITION ORDER BY b
en la consulta SQL, puede estableceranalyze
los camposwithSinglePartition=true
yorderBy=[OrderingColumn("b")]
luego, simplemente pasarTABLE(t)
. - En lugar de pasar
TABLE(SELECT a FROM t)
en la consulta SQL, puede hacer queanalyze
definaselect=[SelectedColumn("a")]
y, a continuación, simplemente pasarTABLE(t)
.
En el ejemplo siguiente, analyze
devuelve un esquema de salida constante, selecciona un subconjunto de columnas de la tabla de entrada y especifica que la tabla de entrada se particiona en varias llamadas UDTF en función de los valores de la columna date
:
@staticmethod
def analyze(*args) -> AnalyzeResult:
"""
The input table will be partitioned across several UDTF calls based on the monthly
values of each `date` column. The rows within each partition will arrive ordered by the `date`
column. The UDTF will only receive the `date` and `word` columns from the input table.
"""
from pyspark.sql.functions import (
AnalyzeResult,
OrderingColumn,
PartitioningColumn,
)
assert len(args) == 1, "This function accepts one argument only"
assert args[0].isTable, "Only table arguments are supported"
return AnalyzeResult(
schema=StructType()
.add("month", DateType())
.add('longest_word", IntegerType()),
partitionBy=[
PartitioningColumn("extract(month from date)")],
orderBy=[
OrderingColumn("date")],
select=[
SelectedColumn("date"),
SelectedColumn(
name="length(word),
alias="length_word")])