Condividi tramite


Che cosa sono le funzioni definite dall'utente ??

Le funzioni definite dall'utente consentono di riutilizzare e condividere codice che estende le funzionalità predefinite in Azure Databricks. Usare funzioni definite dall'utente per eseguire attività specifiche, ad esempio calcoli complessi, trasformazioni o manipolazioni personalizzate dei dati.

Nota

Nei cluster con modalità di accesso condiviso, le funzioni definite dall'utente scalari Python sono supportate in Databricks Runtime 13.3 LTS e versioni successive, mentre le funzioni definite dall'utente Scala sono supportate in Databricks Runtime 14.2 e versioni successive.

Le funzioni definite dall'utente scalari Python possono essere registrate nel catalogo unity usando la sintassi SQL in Databricks Runtime 13.3 LTS e versioni successive. Si veda Funzioni definite dall'utente (UDF) nel catalogo Unity.

Quando è consigliabile usare una funzione definita dall'utente?

Usare funzioni definite dall'utente per la logica difficili da esprimere con le funzioni Apache Spark predefinite. Le funzioni Apache Spark predefinite sono ottimizzate per l'elaborazione distribuita e offrono in genere prestazioni migliori su larga scala. Per altre informazioni, vedere Funzioni.

Databricks consiglia funzioni definite dall'utente per query ad hoc, pulizia manuale dei dati, analisi esplorativa dei dati e operazioni su set di dati di piccole e medie dimensioni. I casi d'uso comuni per le funzioni definite dall'utente includono crittografia dei dati e decrittografia, hashing, analisi JSON e convalida.

Usare i metodi Apache Spark per le operazioni su set di dati di grandi dimensioni e tutti i carichi di lavoro eseguiti regolarmente o continuamente, inclusi processi ETL e operazioni di streaming.

Funzioni definite dall'utente con ambito registrazione e sessione

Le funzioni definite dall'utente create con SQL vengono registrate nel catalogo unity e dispongono delle autorizzazioni associate, mentre le funzioni definite dall'utente create all'interno del notebook sono basate sulla sessione e hanno come ambito l'oggetto SparkSession corrente.

È possibile definire e accedere alle funzioni definite dall'utente basate su sessione usando qualsiasi linguaggio supportato da Azure Databricks. Le funzioni definite dall'utente possono essere scalari o non scalari.

Nota

Attualmente solo le funzioni definite dall'utente scalari SQL e Python registrate nel catalogo unity sono disponibili in DBSQL.

Funzioni definite dall'utente scalari

Le funzioni definite dall'utente scalari operano su una singola riga e restituiscono un singolo valore per ogni riga. Nell'esempio seguente viene usata una funzione definita dall'utente scalare per calcolare la lunghezza di ogni nome in una name colonna e aggiungere il valore in una nuova colonna name_length:

+-------+-------+
| name  | score |
+-------+-------+
| alice |  10.0 |
| bob   |  20.0 |
| carol |  30.0 |
| dave  |  40.0 |
| eve   |  50.0 |
+-------+-------+
-- Create a SQL UDF for name length
CREATE OR REPLACE FUNCTION get_name_length(name STRING)
RETURNS INT
RETURN LENGTH(name);

-- Use the UDF in a SQL query
SELECT name, get_name_length(name) AS name_length
FROM your_table;
+-------+-------+-------------+
| name  | score | name_length |
+-------+-------+-------------+
| alice |  10.0 |      5      |
|  bob  |  20.0 |      3      |
| carol |  30.0 |      5      |
| dave  |  40.0 |      4      |
|  eve  |  50.0 |      3      |
+-------+-------+-------------+

Per implementare questa funzionalità in un notebook di Databricks usando PySpark:

from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType

@udf(returnType=IntegerType())
def get_name_length(name):
   return len(name)

df = df.withColumn("name_length", get_name_length(df.name))

# Show the result
display(df)

Per altre informazioni, vedere Funzioni definite dall'utente in Unity Catalog e Funzioni scalari definite dall'utente - Python.

Funzioni di aggregazione definite dall'utente (UDAF)

Le funzioni di aggregazione definite dall'utente operano su più righe e restituiscono un singolo risultato aggregato. Nell'esempio seguente viene definito un valore UDAF che aggrega i punteggi.

from pyspark.sql.functions import pandas_udf
from pyspark.sql import SparkSession
import pandas as pd

# Define a pandas UDF for aggregating scores
@pandas_udf("int")
def total_score_udf(scores: pd.Series) -> int:
    return scores.sum()

# Group by name length and aggregate
result_df = (df.groupBy("name_length")
              .agg(total_score_udf(df["score"]).alias("total_score")))

display(result_df)
+-------------+-------------+
| name_length | total_score |
+-------------+-------------+
|      3      |     70.0    |
|      4      |     40.0    |
|      5      |     40.0    |
+-------------+-------------+

Vedere funzioni pandas definite dall'utente per le funzioni di aggregazione python e definite dall'utente - Scala.

Funzioni di tabella definite dall'utente Python (UDF)

Importante

Questa funzionalità è disponibile in anteprima pubblica.

Nota

Le funzioni definite dall'utente Python sono disponibili in Databricks Runtime 14.3 LTS e versioni successive.

Le funzioni di tabella definite dall'utente python possono restituire più righe e colonne per ogni riga di input. Nell'esempio seguente ogni valore nella colonna score corrisponde a un elenco di categorie. Un tipo definito dall'utente viene definito per suddividere l'elenco delimitato da virgole in più righe. Vedere Funzioni di tabella definite dall'utente Python (UDF)

+-------+-------+-----------------+
| name  | score |   categories    |
+-------+-------+-----------------+
| alice |  10.0 |  math,science   |
|  bob  |  20.0 |  history,math   |
| carol |  30.0 | science,history |
| dave  |  40.0 |    math,art     |
|  eve  |  50.0 |  science,art    |
+-------+-------+-----------------+

from pyspark.sql.functions import udtf

@udtf(returnType="score: int, categories: string, name: string")
class ScoreCategoriesUDTF:
    def eval(self, name: str, score: float, categories: str):
        category_list = categories.split(',')
        for category in category_list:
            yield (name, score, category)

# Apply the UDTF
result_df = df.select(ScoreCategoriesUDTF(df.score, df.categories, df.name))
display(result_df)
+-------+-------+----------+
| name  | score | category |
+-------+-------+----------+
| alice |  10.0 |   math   |
| alice |  10.0 | science  |
|  bob  |  20.0 | history  |
|  bob  |  20.0 |   math   |
| carol |  30.0 | science  |
| carol |  30.0 | history  |
| dave  |  40.0 |   math   |
| dave  |  40.0 |   art    |
|  eve  |  50.0 | science  |
|  eve  |  50.0 |   art    |
+-------+-------+----------+

Considerazioni sulle prestazioni

  • Le funzioni predefinite e le funzioni definite dall'utente SQL sono l'opzione più efficiente disponibile.
  • Le funzioni definite dall'utente Scala sono in genere più veloci man mano che vengono eseguite all'interno della JVM (Java Virtual Machine) ed evitano il sovraccarico dello spostamento e dell'uscita dei dati dalla JVM.
  • Le funzioni definite dall'utente Python e le funzioni definite dall'utente Pandas tendono a essere più lente rispetto alle funzioni definite dall'utente scala perché richiedono che i dati vengano serializzati e spostati dalla JVM all'interprete Python. Le funzioni definite dall'utente Pandas fino a 100 volte più velocemente rispetto alle funzioni definite dall'utente Python perché usano Apache Arrow per ridurre i costi di serializzazione.