Compartir a través de


¿Qué son las funciones definidas por el usuario (UDF)?

Las funciones definidas por el usuario (UDF) permiten reutilizar y compartir código que amplía la funcionalidad integrada en Azure Databricks. Use UDF para realizar tareas específicas, como cálculos complejos, transformaciones o manipulaciones de datos personalizadas.

Nota:

En clústeres con modo de acceso compartido, las UDF escalares de Python se admiten en Databricks Runtime 13.3 LTS y versiones posteriores, mientras que las UDF de Scala se admiten en Databricks Runtime 14.2 y versiones posteriores.

Las UDF escalares de Python se pueden registrar en el Catálogo de Unity mediante la sintaxis SQL en Databricks Runtime 13.3 LTS y versiones posteriores. Consulte Funciones definidas por el usuario (UDF) en Unity Catalog.

¿Cuándo debe usar una UDF?

Use UDF para la lógica que es difícil de expresar con funciones integradas de Apache Spark. Las funciones integradas de Apache Spark están optimizadas para el procesamiento distribuido y, por lo general, ofrecen un mejor rendimiento a escala. Para obtener más información, vea Funciones.

Databricks recomienda UDF para consultas ad hoc, limpieza manual de datos, análisis exploratorio de datos y operaciones en conjuntos de datos pequeños a medianos. Entre los casos de uso comunes para las UDF se incluyen el cifrado y el descifrado de datos, el hash, el análisis de JSON y la validación.

Use métodos de Apache Spark para las operaciones en conjuntos de datos muy grandes y todas las cargas de trabajo que se ejecuten periódica o continuamente, incluidos los trabajos ETL y las operaciones de streaming.

UDF con ámbito de sesión y registrados

Las UDF creadas con SQL se registran en el Catálogo de Unity y tienen permisos asociados, mientras que las UDF creadas en el cuaderno se basan en la sesión y se limitan a SparkSession actual.

Puede definir y acceder a las UDF basadas en sesión mediante cualquier lenguaje compatible con Azure Databricks. Las UDF pueden ser escalares o no escalares.

Nota:

Actualmente, solo las UDF escalares de SQL y Python registradas en el catálogo de Unity están disponibles en DBSQL.

UDF escalares

Las UDF escalares funcionan en una sola fila y devuelven un único valor para cada fila. En el ejemplo siguiente se usa una UDF escalar para calcular la longitud de cada nombre de una name columna y agregar el valor en una nueva columna name_length:

+-------+-------+
| name  | score |
+-------+-------+
| alice |  10.0 |
| bob   |  20.0 |
| carol |  30.0 |
| dave  |  40.0 |
| eve   |  50.0 |
+-------+-------+
-- Create a SQL UDF for name length
CREATE OR REPLACE FUNCTION get_name_length(name STRING)
RETURNS INT
RETURN LENGTH(name);

-- Use the UDF in a SQL query
SELECT name, get_name_length(name) AS name_length
FROM your_table;
+-------+-------+-------------+
| name  | score | name_length |
+-------+-------+-------------+
| alice |  10.0 |      5      |
|  bob  |  20.0 |      3      |
| carol |  30.0 |      5      |
| dave  |  40.0 |      4      |
|  eve  |  50.0 |      3      |
+-------+-------+-------------+

Para implementarlo en un cuaderno de Databricks mediante PySpark:

from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType

@udf(returnType=IntegerType())
def get_name_length(name):
   return len(name)

df = df.withColumn("name_length", get_name_length(df.name))

# Show the result
display(df)

Para obtener más información, consulte Funciones definidas por el usuario (UDF) en el catálogo de Unity y funciones escalares definidas por el usuario: Python.

Funciones de agregado definidas por el usuario (UDAF)

Las funciones de agregado definidas por el usuario (UDAFs) funcionan en varias filas y devuelven un único resultado agregado. En el ejemplo siguiente, se define una UDAF que agrega puntuaciones.

from pyspark.sql.functions import pandas_udf
from pyspark.sql import SparkSession
import pandas as pd

# Define a pandas UDF for aggregating scores
@pandas_udf("int")
def total_score_udf(scores: pd.Series) -> int:
    return scores.sum()

# Group by name length and aggregate
result_df = (df.groupBy("name_length")
              .agg(total_score_udf(df["score"]).alias("total_score")))

display(result_df)
+-------------+-------------+
| name_length | total_score |
+-------------+-------------+
|      3      |     70.0    |
|      4      |     40.0    |
|      5      |     40.0    |
+-------------+-------------+

Consulte funciones definidas por el usuario de Pandas para Python y funciones de agregado definidas por el usuario: Scala.

Funciones de tabla definidas por el usuario de Python (UDTF)

Importante

Esta característica está en versión preliminar pública.

Nota:

Las UDF de Python están disponibles en Databricks Runtime 14.3 LTS y versiones posteriores.

Las funciones de tabla definidas por el usuario (UDF) de Python pueden devolver varias filas y columnas para cada fila de entrada. En el ejemplo siguiente, cada valor de la columna de puntuación corresponde a una lista de categorías. Se define un UDTF para dividir la lista separada por comas en varias filas. Consulte Funciones de tabla definidas por el usuario (UDF) de Python.

+-------+-------+-----------------+
| name  | score |   categories    |
+-------+-------+-----------------+
| alice |  10.0 |  math,science   |
|  bob  |  20.0 |  history,math   |
| carol |  30.0 | science,history |
| dave  |  40.0 |    math,art     |
|  eve  |  50.0 |  science,art    |
+-------+-------+-----------------+

from pyspark.sql.functions import udtf

@udtf(returnType="score: int, categories: string, name: string")
class ScoreCategoriesUDTF:
    def eval(self, name: str, score: float, categories: str):
        category_list = categories.split(',')
        for category in category_list:
            yield (name, score, category)

# Apply the UDTF
result_df = df.select(ScoreCategoriesUDTF(df.score, df.categories, df.name))
display(result_df)
+-------+-------+----------+
| name  | score | category |
+-------+-------+----------+
| alice |  10.0 |   math   |
| alice |  10.0 | science  |
|  bob  |  20.0 | history  |
|  bob  |  20.0 |   math   |
| carol |  30.0 | science  |
| carol |  30.0 | history  |
| dave  |  40.0 |   math   |
| dave  |  40.0 |   art    |
|  eve  |  50.0 | science  |
|  eve  |  50.0 |   art    |
+-------+-------+----------+

Consideraciones sobre el rendimiento

  • Las funciones integradas y las UDF de SQL son la opción más eficaz disponible.
  • Las UDF de Scala suelen ser más rápidas a medida que se ejecutan dentro de la máquina virtual Java (JVM) y evitan la sobrecarga de mover datos dentro y fuera de la JVM.
  • Las UDF de Python y las UDF de Pandas tienden a ser más lentas que las UDF de Scala porque requieren que los datos se serialicen y se muevan de la JVM al intérprete de Python. UDF de Pandas hasta 100 veces más rápidas que las UDF de Python porque usan Apache Arrow para reducir los costos de serialización.