Funzioni di tabella definite dall'utente Python (UDF)
Importante
Questa funzionalità si trova in anteprima pubblica in Databricks Runtime 14.3 LTS e versioni successive.
Una funzione di tabella definita dall'utente (UDTF) consente di registrare funzioni che restituiscono tabelle anziché valori scalari. A differenza delle funzioni scalari che restituiscono un singolo valore di risultato da ogni chiamata, ogni UDTF viene invocata nella clausola FROM
di un'istruzione SQL e restituisce una tabella intera come output.
Ogni chiamata UDTF può accettare zero o più argomenti. Questi argomenti possono essere espressioni scalari o argomenti di tabella che rappresentano intere tabelle di input.
Sintassi UDTF di base
Apache Spark implementa le funzioni definite dall'utente Python come classi Python con un metodo di eval
obbligatorio che usa yield
per generare righe di output.
Per usare la tua classe come UDTF, è necessario importare la funzione PySpark udtf
. Databricks consiglia di usare questa funzione come decorator e specificare in modo esplicito i nomi e i tipi di campo usando l'opzione returnType
(a meno che la classe non definisca un metodo analyze
come descritto in una sezione successiva).
La seguente UDTF crea una tabella utilizzando un elenco fisso di due argomenti integer:
from pyspark.sql.functions import lit, udtf
@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
def eval(self, x: int, y: int):
yield x + y, x - y
GetSumDiff(lit(1), lit(2)).show()
+----+-----+
| sum| diff|
+----+-----+
| 3| -1|
+----+-----+
Registrare una UDTF
Le UDTF (User Defined Table Function) vengono registrate nel SparkSession
locale e sono isolate a livello di notebook o processo.
Non è possibile registrare funzioni di tabella definite dall'utente come oggetti nel Catalogo Unity e non è possibile usare funzioni di tabella definite dall'utente con SQL Warehouse.
È possibile registrare un UDTF per l'SparkSession
corrente da usare nelle query SQL con la funzione spark.udtf.register()
. Specificare un nome per la funzione SQL e la classe UDTF python.
spark.udtf.register("get_sum_diff", GetSumDiff)
Chiamare un UDTF che è registrato
Dopo la registrazione, è possibile usare UDTF in SQL usando il comando magic %sql
o la funzione spark.sql()
:
spark.udtf.register("get_sum_diff", GetSumDiff)
spark.sql("SELECT * FROM get_sum_diff(1,2);")
%sql
SELECT * FROM get_sum_diff(1,2);
Usare Apache Arrow
Se il tipo definito dall'utente riceve una piccola quantità di dati come input, ma restituisce una tabella di grandi dimensioni, Databricks consiglia di usare Apache Arrow. È possibile abilitarla specificando il parametro useArrow
quando si dichiara la UDTF.
@udtf(returnType="c1: int, c2: int", useArrow=True)
Elenchi di argomenti variabili - *args e **kwargs
È possibile usare python *args
o **kwargs
sintassi e implementare la logica per gestire un numero non specificato di valori di input.
Nell'esempio seguente viene restituito lo stesso risultato controllando in modo esplicito la lunghezza e i tipi di input per gli argomenti:
@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
def eval(self, *args):
assert(len(args) == 2)
assert(isinstance(arg, int) for arg in args)
x = args[0]
y = args[1]
yield x + y, x - y
GetSumDiff(lit(1), lit(2)).show()
Di seguito è riportato lo stesso esempio, ma si usano argomenti di parole chiave:
@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
def eval(self, **kwargs):
x = kwargs["x"]
y = kwargs["y"]
yield x + y, x - y
GetSumDiff(x=lit(1), y=lit(2)).show()
Definire uno schema statico in fase di registrazione
Il UDTF restituisce delle righe con uno schema di output che comprende una sequenza ordinata di nomi e tipi delle colonne. Se lo schema UDTF deve rimanere sempre lo stesso per tutte le query, è possibile specificare uno schema statico e fisso dopo l'@udtf
decorator. Deve essere un StructType
:
StructType().add("c1", StringType())
Oppure una stringa DDL che rappresenta un tipo di struct:
c1: string
Calcolare uno schema dinamico in fase di chiamata di funzione
Le funzioni definite dall'utente possono anche calcolare lo schema di output a livello di codice per ogni chiamata a seconda dei valori degli argomenti di input. A tale scopo, definire un metodo statico denominato analyze
che accetta zero o più parametri che corrispondono agli argomenti forniti alla chiamata UDTF specifica.
Ogni argomento del metodo analyze
è un'istanza della classe AnalyzeArgument
che contiene i campi seguenti:
campo classe AnalyzeArgument |
Descrizione |
---|---|
dataType |
Tipo del parametro di ingresso come DataType . Per gli argomenti della tabella di input, si tratta di un StructType che rappresenta le colonne della tabella. |
value |
Valore dell'argomento di input come Optional[Any] . Si tratta di None per argomenti di tabella o argomenti scalari letterali che non sono costanti. |
isTable |
Indica se l'argomento di input è una tabella come BooleanType . |
isConstantExpression |
Indica se l'argomento di input è un'espressione costante pieghevole come BooleanType . |
Il metodo analyze
restituisce un'istanza della classe AnalyzeResult
, che include lo schema della tabella dei risultati come StructType
più alcuni campi facoltativi. Se UDTF accetta un argomento di tabella di input, il AnalyzeResult
può includere anche un modo richiesto per partizionare e ordinare le righe della tabella di input in diverse chiamate UTF, come descritto più avanti.
campo classe AnalyzeResult |
Descrizione |
---|---|
schema |
Schema della tabella dei risultati come StructType . |
withSinglePartition |
Indica se inviare tutte le righe di input alla stessa istanza della classe UDTF come un BooleanType . |
partitionBy |
Se impostato su non vuoto, tutte le righe con ogni combinazione univoca di valori delle espressioni di partizionamento vengono utilizzate da un'istanza separata della classe UDTF. |
orderBy |
Se impostato su non vuoto, specifica un ordinamento di righe all'interno di ogni partizione. |
select |
Se impostato su non vuoto, questa è una sequenza di espressioni che l'UDTF specifica a Catalyst di valutare rispetto alle colonne nell'argomento TABLE di input. L'UDTF riceve un attributo di input per ciascun nome nell'elenco nell'ordine in cui sono elencati. |
Questo esempio analyze
restituisce una colonna di output per ogni parola nella stringa di input.
@udtf
class MyUDTF:
@staticmethod
def analyze(text: AnalyzeArgument) -> AnalyzeResult:
schema = StructType()
for index, word in enumerate(sorted(list(set(text.value.split(" "))))):
schema = schema.add(f"word_{index}", IntegerType())
return AnalyzeResult(schema=schema)
def eval(self, text: str):
counts = {}
for word in text.split(" "):
if word not in counts:
counts[word] = 0
counts[word] += 1
result = []
for word in sorted(list(set(text.split(" ")))):
result.append(counts[word])
yield result
['word_0', 'word_1']
Trasmetti lo stato alle chiamate future eval
Il metodo analyze
può fungere da luogo pratico per eseguire l'inizializzazione e quindi inoltrare i risultati alle future chiamate al metodo eval
per la stessa chiamata UDTF.
A tale scopo, creare una sottoclasse di AnalyzeResult
e restituire un'istanza della sottoclasse dal metodo analyze
.
Aggiungere quindi un argomento aggiuntivo al metodo __init__
per accettare tale istanza.
Questo esempio analyze
restituisce uno schema di output costante, ma aggiunge informazioni personalizzate nei metadati dei risultati da utilizzare da chiamate future __init__
metodo:
@dataclass
class AnalyzeResultWithBuffer(AnalyzeResult):
buffer: str = ""
@udtf
class TestUDTF:
def __init__(self, analyze_result=None):
self._total = 0
if analyze_result is not None:
self._buffer = analyze_result.buffer
else:
self._buffer = ""
@staticmethod
def analyze(argument, _) -> AnalyzeResult:
if (
argument.value is None
or argument.isTable
or not isinstance(argument.value, str)
or len(argument.value) == 0
):
raise Exception("The first argument must be a non-empty string")
assert argument.dataType == StringType()
assert not argument.isTable
return AnalyzeResultWithBuffer(
schema=StructType()
.add("total", IntegerType())
.add("buffer", StringType()),
withSinglePartition=True,
buffer=argument.value,
)
def eval(self, argument, row: Row):
self._total += 1
def terminate(self):
yield self._total, self._buffer
self.spark.udtf.register("test_udtf", TestUDTF)
spark.sql(
"""
WITH t AS (
SELECT id FROM range(1, 21)
)
SELECT total, buffer
FROM test_udtf("abc", TABLE(t))
"""
).show()
+-------+-------+
| count | buffer|
+-------+-------+
| 20 | "abc"|
+-------+-------+
Generare righe di output
Il metodo eval
viene eseguito una sola volta per ogni riga dell'argomento tabella di input (o una sola volta se non viene specificato alcun argomento di tabella), seguito da una chiamata del metodo terminate
alla fine. Il metodo restituisce zero o più righe conformi allo schema dei risultati producendo tuple, elenchi o oggetti pyspark.sql.Row
.
In questo esempio, una tupla di tre elementi restituirà una riga.
def eval(self, x, y, z):
yield (x, y, z)
È anche possibile omettere le parentesi:
def eval(self, x, y, z):
yield x, y, z
Aggiungere una virgola finale per restituire una riga con una sola colonna:
def eval(self, x, y, z):
yield x,
È anche possibile produrre un oggetto pyspark.sql.Row
.
def eval(self, x, y, z)
from pyspark.sql.types import Row
yield Row(x, y, z)
Questo esempio restituisce righe di output dal metodo terminate
usando un elenco Python. È possibile archiviare lo stato all'interno della classe dai passaggi precedenti nella valutazione UDTF per questo scopo.
def terminate(self):
yield [self.x, self.y, self.z]
Passare argomenti scalari a una UDTF (funzione definita dall'utente)
È possibile passare argomenti scalari a un UDTF come espressioni costanti che comprendono valori letterali o funzioni basate su di essi. Per esempio:
SELECT * FROM udtf(42, group => upper("finance_department"));
Passare argomenti di tabella a una funzione di tabella definita dall'utente (UDTF)
Le funzioni tabellari definite dall'utente (UDTF) in Python possono accettare una tabella di input come argomento, oltre agli argomenti di input scalari. Un singolo tipo definito dall'utente può anche accettare un argomento di tabella e più argomenti scalari.
Qualsiasi query SQL può quindi fornire una tabella di input usando la parola chiave TABLE
seguita da parentesi che circondano un identificatore di tabella appropriato, ad esempio TABLE(t)
. In alternativa, è possibile passare una sottoquery di tabella, ad esempio TABLE(SELECT a, b, c FROM t)
o TABLE(SELECT t1.a, t2.b FROM t1 INNER JOIN t2 USING (key))
.
L'argomento tabella di input viene quindi rappresentato come argomento pyspark.sql.Row
al metodo eval
, con una chiamata al metodo eval
per ogni riga della tabella di input. È possibile usare annotazioni standard dei campi di colonna PySpark per interagire con le colonne in ogni riga. L'esempio seguente illustra l'importazione esplicita del tipo Row
di PySpark e quindi filtrando la tabella passata sul campo id
.
from pyspark.sql.functions import udtf
from pyspark.sql.types import Row
@udtf(returnType="id: int")
class FilterUDTF:
def eval(self, row: Row):
if row["id"] > 5:
yield row["id"],
spark.udtf.register("filter_udtf", FilterUDTF)
Per eseguire una query sulla funzione, usare la parola chiave SQL TABLE
:
SELECT * FROM filter_udtf(TABLE(SELECT * FROM range(10)));
+---+
| id|
+---+
| 6|
| 7|
| 8|
| 9|
+---+
Specificare un partizionamento delle righe di input dalle chiamate di funzione.
Quando si chiama una UDTF con un argomento di tabella, qualsiasi query SQL può partizionare la tabella di input in diverse chiamate UDTF in base ai valori di una o più colonne della tabella di input.
Per specificare una partizione, usare la clausola PARTITION BY
nella chiamata di funzione dopo l'argomento TABLE
.
Ciò garantisce che tutte le righe di input con ogni combinazione univoca di valori delle colonne di partizionamento vengano utilizzate da una sola istanza della classe UDTF.
Si noti che oltre ai riferimenti a colonne semplici, la clausola PARTITION BY
accetta anche espressioni arbitrarie basate sulle colonne della tabella di input. Ad esempio, è possibile specificare il LENGTH
di una stringa, estrarre un mese da una data o concatenare due valori.
È anche possibile specificare WITH SINGLE PARTITION
anziché PARTITION BY
per richiedere una sola partizione in cui tutte le righe di input devono essere utilizzate da una sola istanza della classe UDTF.
All'interno di ogni partizione, è possibile specificare facoltativamente un ordinamento delle righe di input obbligatorio per il consumo da parte del metodo eval
dell'UDTF. A tale scopo, specificare una clausola ORDER BY
dopo la clausola PARTITION BY
o WITH SINGLE PARTITION
descritta in precedenza.
Si consideri, ad esempio, il seguente UDTF:
from pyspark.sql.functions import udtf
from pyspark.sql.types import Row
@udtf(returnType="a: string, b: int")
class FilterUDTF:
def __init__(self):
self.key = ""
self.max = 0
def eval(self, row: Row):
self.key = row["a"]
self.max = max(self.max, row["b"])
def terminate(self):
yield self.key, self.max
spark.udtf.register("filter_udtf", FilterUDTF)
È possibile specificare le opzioni di partizionamento quando si chiama la UDTF sulla tabella di input in diversi modi.
-- Create an input table with some example values.
DROP TABLE IF EXISTS values_table;
CREATE TABLE values_table (a STRING, b INT);
INSERT INTO values_table VALUES ('abc', 2), ('abc', 4), ('def', 6), ('def', 8)";
SELECT * FROM values_table;
+-------+----+
| a | b |
+-------+----+
| "abc" | 2 |
| "abc" | 4 |
| "def" | 6 |
| "def" | 8 |
+-------+----+
-- Query the UDTF with the input table as an argument and a directive to partition the input
-- rows such that all rows with each unique value in the `a` column are processed by the same
-- instance of the UDTF class. Within each partition, the rows are ordered by the `b` column.
SELECT * FROM filter_udtf(TABLE(values_table) PARTITION BY a ORDER BY b) ORDER BY 1;
+-------+----+
| a | b |
+-------+----+
| "abc" | 4 |
| "def" | 8 |
+-------+----+
-- Query the UDTF with the input table as an argument and a directive to partition the input
-- rows such that all rows with each unique result of evaluating the "LENGTH(a)" expression are
-- processed by the same instance of the UDTF class. Within each partition, the rows are ordered
-- by the `b` column.
SELECT * FROM filter_udtf(TABLE(values_table) PARTITION BY LENGTH(a) ORDER BY b) ORDER BY 1;
+-------+---+
| a | b |
+-------+---+
| "def" | 8 |
+-------+---+
-- Query the UDTF with the input table as an argument and a directive to consider all the input
-- rows in one single partition such that exactly one instance of the UDTF class consumes all of
-- the input rows. Within each partition, the rows are ordered by the `b` column.
SELECT * FROM filter_udtf(TABLE(values_table) WITH SINGLE PARTITION ORDER BY b) ORDER BY 1;
+-------+----+
| a | b |
+-------+----+
| "def" | 8 |
+-------+----+
Specificare un partizionamento delle righe di input dal metodo analyze
Si noti che per ognuno dei modi precedenti di partizionamento della tabella di input quando si chiamano funzioni definite dall'utente nelle query SQL, esiste un modo corrispondente per il metodo analyze
della UDTF per specificare lo stesso metodo di partizionamento automaticamente.
- Anziché chiamare una UDTF come
SELECT * FROM udtf(TABLE(t) PARTITION BY a)
, è possibile aggiornare il metodoanalyze
per impostare il campopartitionBy=[PartitioningColumn("a")]
e semplicemente chiamare la UDTF usandoSELECT * FROM udtf(TABLE(t))
. - Con lo stesso token, invece di specificare
TABLE(t) WITH SINGLE PARTITION ORDER BY b
nella query SQL, è possibile impostareanalyze
i campiwithSinglePartition=true
eorderBy=[OrderingColumn("b")]
e quindi passare semplicementeTABLE(t)
. - Anziché passare
TABLE(SELECT a FROM t)
nella query SQL, è possibile impostareanalyze
select=[SelectedColumn("a")]
e quindi passare semplicementeTABLE(t)
.
Nell'esempio seguente, analyze
restituisce uno schema di output costante, seleziona un sottoinsieme di colonne dalla tabella di input e specifica che la tabella di input viene partizionata in diverse chiamate UDTF basato sui valori della colonna date
.
@staticmethod
def analyze(*args) -> AnalyzeResult:
"""
The input table will be partitioned across several UDTF calls based on the monthly
values of each `date` column. The rows within each partition will arrive ordered by the `date`
column. The UDTF will only receive the `date` and `word` columns from the input table.
"""
from pyspark.sql.functions import (
AnalyzeResult,
OrderingColumn,
PartitioningColumn,
)
assert len(args) == 1, "This function accepts one argument only"
assert args[0].isTable, "Only table arguments are supported"
return AnalyzeResult(
schema=StructType()
.add("month", DateType())
.add('longest_word", IntegerType()),
partitionBy=[
PartitioningColumn("extract(month from date)")],
orderBy=[
OrderingColumn("date")],
select=[
SelectedColumn("date"),
SelectedColumn(
name="length(word),
alias="length_word")])