Condividi tramite


Funzioni di tabella definite dall'utente Python (UDF)

Importante

Questa funzionalità si trova in anteprima pubblica in Databricks Runtime 14.3 LTS e versioni successive.

Una funzione di tabella definita dall'utente (UDTF) consente di registrare funzioni che restituiscono tabelle anziché valori scalari. A differenza delle funzioni scalari che restituiscono un singolo valore di risultato da ogni chiamata, ogni UDTF viene invocata nella clausola FROM di un'istruzione SQL e restituisce una tabella intera come output.

Ogni chiamata UDTF può accettare zero o più argomenti. Questi argomenti possono essere espressioni scalari o argomenti di tabella che rappresentano intere tabelle di input.

Sintassi UDTF di base

Apache Spark implementa le funzioni definite dall'utente Python come classi Python con un metodo di eval obbligatorio che usa yield per generare righe di output.

Per usare la tua classe come UDTF, è necessario importare la funzione PySpark udtf. Databricks consiglia di usare questa funzione come decorator e specificare in modo esplicito i nomi e i tipi di campo usando l'opzione returnType (a meno che la classe non definisca un metodo analyze come descritto in una sezione successiva).

La seguente UDTF crea una tabella utilizzando un elenco fisso di due argomenti integer:

from pyspark.sql.functions import lit, udtf

@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
    def eval(self, x: int, y: int):
        yield x + y, x - y

GetSumDiff(lit(1), lit(2)).show()
+----+-----+
| sum| diff|
+----+-----+
|   3|   -1|
+----+-----+

Registrare una UDTF

Le UDTF (User Defined Table Function) vengono registrate nel SparkSession locale e sono isolate a livello di notebook o processo.

Non è possibile registrare funzioni di tabella definite dall'utente come oggetti nel Catalogo Unity e non è possibile usare funzioni di tabella definite dall'utente con SQL Warehouse.

È possibile registrare un UDTF per l'SparkSession corrente da usare nelle query SQL con la funzione spark.udtf.register(). Specificare un nome per la funzione SQL e la classe UDTF python.

spark.udtf.register("get_sum_diff", GetSumDiff)

Chiamare un UDTF che è registrato

Dopo la registrazione, è possibile usare UDTF in SQL usando il comando magic %sql o la funzione spark.sql():

spark.udtf.register("get_sum_diff", GetSumDiff)
spark.sql("SELECT * FROM get_sum_diff(1,2);")
%sql
SELECT * FROM get_sum_diff(1,2);

Usare Apache Arrow

Se il tipo definito dall'utente riceve una piccola quantità di dati come input, ma restituisce una tabella di grandi dimensioni, Databricks consiglia di usare Apache Arrow. È possibile abilitarla specificando il parametro useArrow quando si dichiara la UDTF.

@udtf(returnType="c1: int, c2: int", useArrow=True)

Elenchi di argomenti variabili - *args e **kwargs

È possibile usare python *args o **kwargs sintassi e implementare la logica per gestire un numero non specificato di valori di input.

Nell'esempio seguente viene restituito lo stesso risultato controllando in modo esplicito la lunghezza e i tipi di input per gli argomenti:

@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
    def eval(self, *args):
        assert(len(args) == 2)
        assert(isinstance(arg, int) for arg in args)
        x = args[0]
        y = args[1]
        yield x + y, x - y

GetSumDiff(lit(1), lit(2)).show()

Di seguito è riportato lo stesso esempio, ma si usano argomenti di parole chiave:

@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
    def eval(self, **kwargs):
        x = kwargs["x"]
        y = kwargs["y"]
        yield x + y, x - y

GetSumDiff(x=lit(1), y=lit(2)).show()

Definire uno schema statico in fase di registrazione

Il UDTF restituisce delle righe con uno schema di output che comprende una sequenza ordinata di nomi e tipi delle colonne. Se lo schema UDTF deve rimanere sempre lo stesso per tutte le query, è possibile specificare uno schema statico e fisso dopo l'@udtf decorator. Deve essere un StructType:

StructType().add("c1", StringType())

Oppure una stringa DDL che rappresenta un tipo di struct:

c1: string

Calcolare uno schema dinamico in fase di chiamata di funzione

Le funzioni definite dall'utente possono anche calcolare lo schema di output a livello di codice per ogni chiamata a seconda dei valori degli argomenti di input. A tale scopo, definire un metodo statico denominato analyze che accetta zero o più parametri che corrispondono agli argomenti forniti alla chiamata UDTF specifica.

Ogni argomento del metodo analyze è un'istanza della classe AnalyzeArgument che contiene i campi seguenti:

campo classe AnalyzeArgument Descrizione
dataType Tipo del parametro di ingresso come DataType. Per gli argomenti della tabella di input, si tratta di un StructType che rappresenta le colonne della tabella.
value Valore dell'argomento di input come Optional[Any]. Si tratta di None per argomenti di tabella o argomenti scalari letterali che non sono costanti.
isTable Indica se l'argomento di input è una tabella come BooleanType.
isConstantExpression Indica se l'argomento di input è un'espressione costante pieghevole come BooleanType.

Il metodo analyze restituisce un'istanza della classe AnalyzeResult, che include lo schema della tabella dei risultati come StructType più alcuni campi facoltativi. Se UDTF accetta un argomento di tabella di input, il AnalyzeResult può includere anche un modo richiesto per partizionare e ordinare le righe della tabella di input in diverse chiamate UTF, come descritto più avanti.

campo classe AnalyzeResult Descrizione
schema Schema della tabella dei risultati come StructType.
withSinglePartition Indica se inviare tutte le righe di input alla stessa istanza della classe UDTF come un BooleanType.
partitionBy Se impostato su non vuoto, tutte le righe con ogni combinazione univoca di valori delle espressioni di partizionamento vengono utilizzate da un'istanza separata della classe UDTF.
orderBy Se impostato su non vuoto, specifica un ordinamento di righe all'interno di ogni partizione.
select Se impostato su non vuoto, questa è una sequenza di espressioni che l'UDTF specifica a Catalyst di valutare rispetto alle colonne nell'argomento TABLE di input. L'UDTF riceve un attributo di input per ciascun nome nell'elenco nell'ordine in cui sono elencati.

Questo esempio analyze restituisce una colonna di output per ogni parola nella stringa di input.

@udtf
class MyUDTF:
  @staticmethod
  def analyze(text: AnalyzeArgument) -> AnalyzeResult:
    schema = StructType()
    for index, word in enumerate(sorted(list(set(text.value.split(" "))))):
      schema = schema.add(f"word_{index}", IntegerType())
    return AnalyzeResult(schema=schema)

  def eval(self, text: str):
    counts = {}
    for word in text.split(" "):
      if word not in counts:
            counts[word] = 0
      counts[word] += 1
    result = []
    for word in sorted(list(set(text.split(" ")))):
      result.append(counts[word])
    yield result
['word_0', 'word_1']

Trasmetti lo stato alle chiamate future eval

Il metodo analyze può fungere da luogo pratico per eseguire l'inizializzazione e quindi inoltrare i risultati alle future chiamate al metodo eval per la stessa chiamata UDTF.

A tale scopo, creare una sottoclasse di AnalyzeResult e restituire un'istanza della sottoclasse dal metodo analyze. Aggiungere quindi un argomento aggiuntivo al metodo __init__ per accettare tale istanza.

Questo esempio analyze restituisce uno schema di output costante, ma aggiunge informazioni personalizzate nei metadati dei risultati da utilizzare da chiamate future __init__ metodo:

@dataclass
class AnalyzeResultWithBuffer(AnalyzeResult):
    buffer: str = ""

@udtf
class TestUDTF:
  def __init__(self, analyze_result=None):
    self._total = 0
    if analyze_result is not None:
      self._buffer = analyze_result.buffer
    else:
      self._buffer = ""

  @staticmethod
  def analyze(argument, _) -> AnalyzeResult:
    if (
      argument.value is None
      or argument.isTable
      or not isinstance(argument.value, str)
      or len(argument.value) == 0
    ):
      raise Exception("The first argument must be a non-empty string")
    assert argument.dataType == StringType()
    assert not argument.isTable
    return AnalyzeResultWithBuffer(
      schema=StructType()
        .add("total", IntegerType())
        .add("buffer", StringType()),
      withSinglePartition=True,
      buffer=argument.value,
    )

  def eval(self, argument, row: Row):
    self._total += 1

  def terminate(self):
    yield self._total, self._buffer

self.spark.udtf.register("test_udtf", TestUDTF)

spark.sql(
  """
  WITH t AS (
    SELECT id FROM range(1, 21)
  )
  SELECT total, buffer
  FROM test_udtf("abc", TABLE(t))
  """
).show()
+-------+-------+
| count | buffer|
+-------+-------+
|    20 |  "abc"|
+-------+-------+

Generare righe di output

Il metodo eval viene eseguito una sola volta per ogni riga dell'argomento tabella di input (o una sola volta se non viene specificato alcun argomento di tabella), seguito da una chiamata del metodo terminate alla fine. Il metodo restituisce zero o più righe conformi allo schema dei risultati producendo tuple, elenchi o oggetti pyspark.sql.Row.

In questo esempio, una tupla di tre elementi restituirà una riga.

def eval(self, x, y, z):
  yield (x, y, z)

È anche possibile omettere le parentesi:

def eval(self, x, y, z):
  yield x, y, z

Aggiungere una virgola finale per restituire una riga con una sola colonna:

def eval(self, x, y, z):
  yield x,

È anche possibile produrre un oggetto pyspark.sql.Row.

def eval(self, x, y, z)
  from pyspark.sql.types import Row
  yield Row(x, y, z)

Questo esempio restituisce righe di output dal metodo terminate usando un elenco Python. È possibile archiviare lo stato all'interno della classe dai passaggi precedenti nella valutazione UDTF per questo scopo.

def terminate(self):
  yield [self.x, self.y, self.z]

Passare argomenti scalari a una UDTF (funzione definita dall'utente)

È possibile passare argomenti scalari a un UDTF come espressioni costanti che comprendono valori letterali o funzioni basate su di essi. Per esempio:

SELECT * FROM udtf(42, group => upper("finance_department"));

Passare argomenti di tabella a una funzione di tabella definita dall'utente (UDTF)

Le funzioni tabellari definite dall'utente (UDTF) in Python possono accettare una tabella di input come argomento, oltre agli argomenti di input scalari. Un singolo tipo definito dall'utente può anche accettare un argomento di tabella e più argomenti scalari.

Qualsiasi query SQL può quindi fornire una tabella di input usando la parola chiave TABLE seguita da parentesi che circondano un identificatore di tabella appropriato, ad esempio TABLE(t). In alternativa, è possibile passare una sottoquery di tabella, ad esempio TABLE(SELECT a, b, c FROM t) o TABLE(SELECT t1.a, t2.b FROM t1 INNER JOIN t2 USING (key)).

L'argomento tabella di input viene quindi rappresentato come argomento pyspark.sql.Row al metodo eval, con una chiamata al metodo eval per ogni riga della tabella di input. È possibile usare annotazioni standard dei campi di colonna PySpark per interagire con le colonne in ogni riga. L'esempio seguente illustra l'importazione esplicita del tipo Row di PySpark e quindi filtrando la tabella passata sul campo id.

from pyspark.sql.functions import udtf
from pyspark.sql.types import Row

@udtf(returnType="id: int")
class FilterUDTF:
    def eval(self, row: Row):
        if row["id"] > 5:
            yield row["id"],

spark.udtf.register("filter_udtf", FilterUDTF)

Per eseguire una query sulla funzione, usare la parola chiave SQL TABLE:

SELECT * FROM filter_udtf(TABLE(SELECT * FROM range(10)));
+---+
| id|
+---+
|  6|
|  7|
|  8|
|  9|
+---+

Specificare un partizionamento delle righe di input dalle chiamate di funzione.

Quando si chiama una UDTF con un argomento di tabella, qualsiasi query SQL può partizionare la tabella di input in diverse chiamate UDTF in base ai valori di una o più colonne della tabella di input.

Per specificare una partizione, usare la clausola PARTITION BY nella chiamata di funzione dopo l'argomento TABLE. Ciò garantisce che tutte le righe di input con ogni combinazione univoca di valori delle colonne di partizionamento vengano utilizzate da una sola istanza della classe UDTF.

Si noti che oltre ai riferimenti a colonne semplici, la clausola PARTITION BY accetta anche espressioni arbitrarie basate sulle colonne della tabella di input. Ad esempio, è possibile specificare il LENGTH di una stringa, estrarre un mese da una data o concatenare due valori.

È anche possibile specificare WITH SINGLE PARTITION anziché PARTITION BY per richiedere una sola partizione in cui tutte le righe di input devono essere utilizzate da una sola istanza della classe UDTF.

All'interno di ogni partizione, è possibile specificare facoltativamente un ordinamento delle righe di input obbligatorio per il consumo da parte del metodo eval dell'UDTF. A tale scopo, specificare una clausola ORDER BY dopo la clausola PARTITION BY o WITH SINGLE PARTITION descritta in precedenza.

Si consideri, ad esempio, il seguente UDTF:

from pyspark.sql.functions import udtf
from pyspark.sql.types import Row

@udtf(returnType="a: string, b: int")
class FilterUDTF:
  def __init__(self):
    self.key = ""
    self.max = 0

  def eval(self, row: Row):
    self.key = row["a"]
    self.max = max(self.max, row["b"])

  def terminate(self):
    yield self.key, self.max

spark.udtf.register("filter_udtf", FilterUDTF)

È possibile specificare le opzioni di partizionamento quando si chiama la UDTF sulla tabella di input in diversi modi.

-- Create an input table with some example values.
DROP TABLE IF EXISTS values_table;
CREATE TABLE values_table (a STRING, b INT);
INSERT INTO values_table VALUES ('abc', 2), ('abc', 4), ('def', 6), ('def', 8)";
SELECT * FROM values_table;
+-------+----+
|     a |  b |
+-------+----+
| "abc" | 2  |
| "abc" | 4  |
| "def" | 6  |
| "def" | 8  |
+-------+----+
-- Query the UDTF with the input table as an argument and a directive to partition the input
-- rows such that all rows with each unique value in the `a` column are processed by the same
-- instance of the UDTF class. Within each partition, the rows are ordered by the `b` column.
SELECT * FROM filter_udtf(TABLE(values_table) PARTITION BY a ORDER BY b) ORDER BY 1;
+-------+----+
|     a |  b |
+-------+----+
| "abc" | 4  |
| "def" | 8  |
+-------+----+

-- Query the UDTF with the input table as an argument and a directive to partition the input
-- rows such that all rows with each unique result of evaluating the "LENGTH(a)" expression are
-- processed by the same instance of the UDTF class. Within each partition, the rows are ordered
-- by the `b` column.
SELECT * FROM filter_udtf(TABLE(values_table) PARTITION BY LENGTH(a) ORDER BY b) ORDER BY 1;
+-------+---+
|     a | b |
+-------+---+
| "def" | 8 |
+-------+---+
-- Query the UDTF with the input table as an argument and a directive to consider all the input
-- rows in one single partition such that exactly one instance of the UDTF class consumes all of
-- the input rows. Within each partition, the rows are ordered by the `b` column.
SELECT * FROM filter_udtf(TABLE(values_table) WITH SINGLE PARTITION ORDER BY b) ORDER BY 1;
+-------+----+
|     a |  b |
+-------+----+
| "def" | 8 |
+-------+----+

Specificare un partizionamento delle righe di input dal metodo analyze

Si noti che per ognuno dei modi precedenti di partizionamento della tabella di input quando si chiamano funzioni definite dall'utente nelle query SQL, esiste un modo corrispondente per il metodo analyze della UDTF per specificare lo stesso metodo di partizionamento automaticamente.

  • Anziché chiamare una UDTF come SELECT * FROM udtf(TABLE(t) PARTITION BY a), è possibile aggiornare il metodo analyze per impostare il campo partitionBy=[PartitioningColumn("a")] e semplicemente chiamare la UDTF usando SELECT * FROM udtf(TABLE(t)).
  • Con lo stesso token, invece di specificare TABLE(t) WITH SINGLE PARTITION ORDER BY b nella query SQL, è possibile impostare analyze i campi withSinglePartition=true e orderBy=[OrderingColumn("b")] e quindi passare semplicemente TABLE(t).
  • Anziché passare TABLE(SELECT a FROM t) nella query SQL, è possibile impostare analyzeselect=[SelectedColumn("a")] e quindi passare semplicemente TABLE(t).

Nell'esempio seguente, analyze restituisce uno schema di output costante, seleziona un sottoinsieme di colonne dalla tabella di input e specifica che la tabella di input viene partizionata in diverse chiamate UDTF basato sui valori della colonna date.

@staticmethod
def analyze(*args) -> AnalyzeResult:
  """
  The input table will be partitioned across several UDTF calls based on the monthly
  values of each `date` column. The rows within each partition will arrive ordered by the `date`
  column. The UDTF will only receive the `date` and `word` columns from the input table.
  """
  from pyspark.sql.functions import (
    AnalyzeResult,
    OrderingColumn,
    PartitioningColumn,
  )

  assert len(args) == 1, "This function accepts one argument only"
  assert args[0].isTable, "Only table arguments are supported"
  return AnalyzeResult(
    schema=StructType()
      .add("month", DateType())
      .add('longest_word", IntegerType()),
    partitionBy=[
      PartitioningColumn("extract(month from date)")],
    orderBy=[
      OrderingColumn("date")],
    select=[
      SelectedColumn("date"),
      SelectedColumn(
        name="length(word),
        alias="length_word")])