Partager via


Fonctions de table définies par l’utilisateur (UDTF) Python

Important

Cette fonctionnalité est en Préversion publique dans Databricks Runtime 14.3 LTS et versions ultérieures.

Une fonction de table définie par l’utilisateur (UDTF) vous permet d’inscrire des fonctions qui retournent des tables au lieu de valeurs scalaires. Contrairement aux fonctions scalaires qui retournent une valeur de résultat unique à partir de chaque appel, chaque UDTF est appelé dans la clause FROM d’une instruction SQL et retourne une table entière en tant que sortie.

Chaque appel UDTF peut accepter zéro ou plusieurs arguments. Ces arguments peuvent être des expressions scalaires ou des arguments de table représentant des tables d’entrée entières.

Syntaxe UDTF de base

Apache Spark implémente des UDTF Python en tant que classes Python avec une méthode eval obligatoire qui utilise yield pour émettre des lignes de sortie.

Pour utiliser votre classe en tant qu’UDTF, vous devez importer la fonction udtf PySpark. Databricks recommande d’utiliser cette fonction comme élément décoratif et de spécifier explicitement les noms et les types de champs à l’aide de l’option returnType (sauf si la classe définit une méthode analyze comme décrit dans une section ultérieure).

L’UDTF suivante crée une table à l’aide d’une liste fixe de deux arguments entiers :

from pyspark.sql.functions import lit, udtf

@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
    def eval(self, x: int, y: int):
        yield x + y, x - y

GetSumDiff(lit(1), lit(2)).show()
+----+-----+
| sum| diff|
+----+-----+
|   3|   -1|
+----+-----+

Inscrire une UDTF

Les UDTF sont inscrites dans la session SparkSession locale et sont isolées au niveau du notebook ou du travail.

Vous ne pouvez pas inscrire d’UDTF en tant qu’objets dans le catalogue Unity, et les UDTF ne peuvent pas être utilisées avec des entrepôts SQL.

Vous pouvez inscrire une UDTF à la session SparkSession actuelle pour une utilisation dans les requêtes SQL avec la fonction spark.udtf.register(). Fournissez un nom pour la fonction SQL et la classe UDTF Python.

spark.udtf.register("get_sum_diff", GetSumDiff)

Appeler une UDTF inscrite

Une fois inscrite, vous pouvez utiliser l’UDTF dans SQL à l’aide de la commande magique %sql ou de la fonction spark.sql() :

spark.udtf.register("get_sum_diff", GetSumDiff)
spark.sql("SELECT * FROM get_sum_diff(1,2);")
%sql
SELECT * FROM get_sum_diff(1,2);

Utiliser Apache Arrow

Si votre UDTF reçoit une petite quantité de données en entrée, mais génère une grande table, Databricks recommande d’utiliser Apache Arrow. Vous pouvez l’activer en spécifiant le paramètre useArrow lors de la déclaration de l’UDTF :

@udtf(returnType="c1: int, c2: int", useArrow=True)

Listes d’arguments variables – *args et **kwargs

Vous pouvez utiliser la syntaxe *args ou **kwargs Python et implémenter la logique pour gérer un nombre non spécifié de valeurs d’entrée.

L’exemple suivant retourne le même résultat tout en vérifiant explicitement la longueur et les types d’entrée pour les arguments :

@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
    def eval(self, *args):
        assert(len(args) == 2)
        assert(isinstance(arg, int) for arg in args)
        x = args[0]
        y = args[1]
        yield x + y, x - y

GetSumDiff(lit(1), lit(2)).show()

Voici le même exemple, mais en utilisant des arguments de mot clé :

@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
    def eval(self, **kwargs):
        x = kwargs["x"]
        y = kwargs["y"]
        yield x + y, x - y

GetSumDiff(x=lit(1), y=lit(2)).show()

Définir un schéma statique au moment de l’inscription

L’UDTF retourne des lignes avec un schéma de sortie comprenant une séquence ordonnée de noms et de types de colonnes. Si le schéma UDTF doit toujours rester le même pour toutes les requêtes, vous pouvez spécifier un schéma statique et fixe après l’élément décoratif @udtf. Il doit s’agir soit d’un StructType :

StructType().add("c1", StringType())

Soit d’une chaîne DDL représentant un type de struct :

c1: string

Calcul d’un schéma dynamique au moment de l’appel de fonction

Les UDTF peuvent également calculer le schéma de sortie par programmation pour chaque appel en fonction des valeurs des arguments d’entrée. Pour ce faire, définissez une méthode statique appelée analyze qui accepte zéro ou plusieurs paramètres correspondant aux arguments fournis à l’appel UDTF spécifique.

Chaque argument de la méthode analyze est une instance de la classe AnalyzeArgument qui contient les champs suivants :

Champ de classe AnalyzeArgument Description
dataType Type de l’argument d’entrée en tant que DataType. Pour les arguments de table d’entrée, il s’agit d’un StructType représentant les colonnes de la table.
value Valeur de l’argument d’entrée en tant que Optional[Any]. Il s’agit de None pour les arguments de table ou les arguments scalaires littéraux qui ne sont pas constants.
isTable Indique si l’argument d’entrée est une table en tant que BooleanType.
isConstantExpression Indique si l’argument d’entrée est une expression pliable constante en tant que BooleanType.

La méthode analyze retourne une instance de la classe AnalyzeResult, qui inclut le schéma de la table de résultats sous la forme d’un StructType ainsi que certains champs facultatifs. Si l’UDTF accepte un argument de table d’entrée, le AnalyzeResult peut également inclure une méthode demandée pour partitionner et classer les lignes de la table d’entrée sur plusieurs appels UDTF, comme décrit plus loin.

Champ de classe AnalyzeResult Description
schema Schéma de la table de résultats en tant que StructType.
withSinglePartition Indique s’il faut envoyer toutes les lignes d’entrée à la même instance de classe UDTF en tant que BooleanType.
partitionBy Si la valeur est non vide, toutes les lignes avec chaque combinaison unique de valeurs des expressions de partitionnement sont consommées par une instance distincte de la classe UDTF.
orderBy Si la valeur est non vide, cela spécifie un classement des lignes dans chaque partition.
select Si la valeur est non vide, il s’agit d’une séquence d’expressions que l’UDTF spécifie pour que Catalyst évalue les colonnes dans l’argument TABLE d’entrée. L’UDTF reçoit un attribut d’entrée pour chaque nom de la liste dans l’ordre dans lequel ils sont répertoriés.

Cet exemple analyze retourne une colonne de sortie pour chaque mot dans l’argument de chaîne d’entrée.

@udtf
class MyUDTF:
  @staticmethod
  def analyze(text: AnalyzeArgument) -> AnalyzeResult:
    schema = StructType()
    for index, word in enumerate(sorted(list(set(text.value.split(" "))))):
      schema = schema.add(f"word_{index}", IntegerType())
    return AnalyzeResult(schema=schema)

  def eval(self, text: str):
    counts = {}
    for word in text.split(" "):
      if word not in counts:
            counts[word] = 0
      counts[word] += 1
    result = []
    for word in sorted(list(set(text.split(" ")))):
      result.append(counts[word])
    yield result
['word_0', 'word_1']

Transférer l’état vers les futurs appels eval

La méthode analyze peut servir d’emplacement pratique pour effectuer l’initialisation, puis transférer les résultats vers les futurs appels de méthode eval pour le même appel UDTF.

Pour ce faire, créez une sous-classe de AnalyzeResult et retournez une instance de la sous-classe à partir de la méthode analyze. Ensuite, ajoutez un argument supplémentaire à la méthode __init__ pour accepter cette instance.

Cet exemple analyze retourne un schéma de sortie constant, mais ajoute des informations personnalisées dans les métadonnées de résultat à consommer par les futurs appels de méthode __init__ :

@dataclass
class AnalyzeResultWithBuffer(AnalyzeResult):
    buffer: str = ""

@udtf
class TestUDTF:
  def __init__(self, analyze_result=None):
    self._total = 0
    if analyze_result is not None:
      self._buffer = analyze_result.buffer
    else:
      self._buffer = ""

  @staticmethod
  def analyze(argument, _) -> AnalyzeResult:
    if (
      argument.value is None
      or argument.isTable
      or not isinstance(argument.value, str)
      or len(argument.value) == 0
    ):
      raise Exception("The first argument must be a non-empty string")
    assert argument.dataType == StringType()
    assert not argument.isTable
    return AnalyzeResultWithBuffer(
      schema=StructType()
        .add("total", IntegerType())
        .add("buffer", StringType()),
      withSinglePartition=True,
      buffer=argument.value,
    )

  def eval(self, argument, row: Row):
    self._total += 1

  def terminate(self):
    yield self._total, self._buffer

self.spark.udtf.register("test_udtf", TestUDTF)

spark.sql(
  """
  WITH t AS (
    SELECT id FROM range(1, 21)
  )
  SELECT total, buffer
  FROM test_udtf("abc", TABLE(t))
  """
).show()
+-------+-------+
| count | buffer|
+-------+-------+
|    20 |  "abc"|
+-------+-------+

Générer des lignes de sortie

La méthode eval s’exécute une fois pour chaque ligne de l’argument de table d’entrée (ou une seule fois si aucun argument de table n’est fourni), suivie d’un appel de la méthode terminate à la fin. Peut importe la méthode, zéro ou plusieurs lignes conformes au schéma de résultat sont générées en produisant des tuples, des listes ou des objets pyspark.sql.Row.

Cet exemple retourne une ligne en fournissant un tuple de trois éléments :

def eval(self, x, y, z):
  yield (x, y, z)

Vous pouvez également omettre les parenthèses :

def eval(self, x, y, z):
  yield x, y, z

Ajoutez une virgule de fin pour retourner une ligne avec une seule colonne :

def eval(self, x, y, z):
  yield x,

Vous pouvez également générer un objet pyspark.sql.Row.

def eval(self, x, y, z)
  from pyspark.sql.types import Row
  yield Row(x, y, z)

Cet exemple génère des lignes de sortie de la méthode terminate à l’aide d’une liste Python. À cette fin, vous pouvez stocker l’état à l’intérieur de la classe à partir des étapes précédentes de l’évaluation UDTF.

def terminate(self):
  yield [self.x, self.y, self.z]

Passer des arguments scalaires à une UDTF

Vous pouvez passer des arguments scalaires à une UDTF en tant qu’expressions constantes comprenant des valeurs littérales ou des fonctions basées sur celles-ci. Par exemple :

SELECT * FROM udtf(42, group => upper("finance_department"));

Passer des arguments de table à une UDTF

Les UDTF Python peuvent accepter une table d’entrée en tant qu’argument en plus des arguments d’entrée scalaires. Une seule UDTF peut également accepter un argument de table et plusieurs arguments scalaires.

Ensuite, toute requête SQL peut fournir une table d’entrée à l’aide du mot clé TABLE suivi de parenthèses entourant un identificateur de table approprié, comme TABLE(t). Vous pouvez également passer une sous-requête de table, comme TABLE(SELECT a, b, c FROM t) ou TABLE(SELECT t1.a, t2.b FROM t1 INNER JOIN t2 USING (key)).

L’argument de table d’entrée est ensuite représenté sous la forme d’un argument pyspark.sql.Row à la méthode eval, avec un appel à la méthode eval pour chaque ligne de la table d’entrée. Vous pouvez utiliser des annotations de champ de colonne PySpark standard pour interagir avec les colonnes de chaque ligne. L’exemple suivant illustre l’importation explicite du type Row PySpark, puis le filtrage de la table passée sur le champ id :

from pyspark.sql.functions import udtf
from pyspark.sql.types import Row

@udtf(returnType="id: int")
class FilterUDTF:
    def eval(self, row: Row):
        if row["id"] > 5:
            yield row["id"],

spark.udtf.register("filter_udtf", FilterUDTF)

Pour interroger la fonction, utilisez le mot clé SQL TABLE :

SELECT * FROM filter_udtf(TABLE(SELECT * FROM range(10)));
+---+
| id|
+---+
|  6|
|  7|
|  8|
|  9|
+---+

Spécifier un partitionnement des lignes d’entrée à partir des appels de fonction

Lors de l’appel d’une UDTF avec un argument de table, toute requête SQL peut partitionner la table d’entrée sur plusieurs appels UDTF en fonction des valeurs d’une ou plusieurs colonnes de table d’entrée.

Pour spécifier une partition, utilisez la clause PARTITION BY dans l’appel de fonction après l’argument TABLE. Cela garantit que toutes les lignes d’entrée avec chaque combinaison unique de valeurs des colonnes de partitionnement seront consommées par une seule instance de la classe UDTF.

Notez qu’en plus des références de colonnes simples, la clause PARTITION BY accepte également des expressions arbitraires en fonction des colonnes de table d’entrée. Par exemple, vous pouvez spécifier la valeur LENGTH d’une chaîne, extraire un mois d’une date ou concaténer deux valeurs.

Il est également possible de spécifier WITH SINGLE PARTITION au lieu de PARTITION BY afin de demander une seule partition où toutes les lignes d’entrée doivent être consommées par une seule instance de la classe UDTF.

Dans chaque partition, vous pouvez éventuellement spécifier un classement obligatoire des lignes d’entrée, car la méthode eval de l’UDTF les consomme. Pour ce faire, fournissez une clause ORDER BY après la clause PARTITION BY ou WITH SINGLE PARTITION décrite ci-dessus.

Par exemple, prenons l’UDTF suivante :

from pyspark.sql.functions import udtf
from pyspark.sql.types import Row

@udtf(returnType="a: string, b: int")
class FilterUDTF:
  def __init__(self):
    self.key = ""
    self.max = 0

  def eval(self, row: Row):
    self.key = row["a"]
    self.max = max(self.max, row["b"])

  def terminate(self):
    yield self.key, self.max

spark.udtf.register("filter_udtf", FilterUDTF)

Vous pouvez spécifier des options de partitionnement lors de l’appel de l’UDTF sur la table d’entrée de plusieurs manière :

-- Create an input table with some example values.
DROP TABLE IF EXISTS values_table;
CREATE TABLE values_table (a STRING, b INT);
INSERT INTO values_table VALUES ('abc', 2), ('abc', 4), ('def', 6), ('def', 8)";
SELECT * FROM values_table;
+-------+----+
|     a |  b |
+-------+----+
| "abc" | 2  |
| "abc" | 4  |
| "def" | 6  |
| "def" | 8  |
+-------+----+
-- Query the UDTF with the input table as an argument and a directive to partition the input
-- rows such that all rows with each unique value in the `a` column are processed by the same
-- instance of the UDTF class. Within each partition, the rows are ordered by the `b` column.
SELECT * FROM filter_udtf(TABLE(values_table) PARTITION BY a ORDER BY b) ORDER BY 1;
+-------+----+
|     a |  b |
+-------+----+
| "abc" | 4  |
| "def" | 8  |
+-------+----+

-- Query the UDTF with the input table as an argument and a directive to partition the input
-- rows such that all rows with each unique result of evaluating the "LENGTH(a)" expression are
-- processed by the same instance of the UDTF class. Within each partition, the rows are ordered
-- by the `b` column.
SELECT * FROM filter_udtf(TABLE(values_table) PARTITION BY LENGTH(a) ORDER BY b) ORDER BY 1;
+-------+---+
|     a | b |
+-------+---+
| "def" | 8 |
+-------+---+
-- Query the UDTF with the input table as an argument and a directive to consider all the input
-- rows in one single partition such that exactly one instance of the UDTF class consumes all of
-- the input rows. Within each partition, the rows are ordered by the `b` column.
SELECT * FROM filter_udtf(TABLE(values_table) WITH SINGLE PARTITION ORDER BY b) ORDER BY 1;
+-------+----+
|     a |  b |
+-------+----+
| "def" | 8 |
+-------+----+

Spécifier un partitionnement des lignes d’entrée à partir de la méthode analyze

Notez que pour chacune des méthodes ci-dessus de partitionnement de la table d’entrée lors de l’appel d’UDTF dans les requêtes SQL, il existe une manière correspondante pour la méthode analyze de l’UDTF de spécifier automatiquement la même méthode de partitionnement automatiquement.

  • Au lieu d’appeler une UDTF en tant que SELECT * FROM udtf(TABLE(t) PARTITION BY a), vous pouvez mettre à jour la méthode analyze pour définir le champ partitionBy=[PartitioningColumn("a")] et simplement appeler la fonction à l’aide de SELECT * FROM udtf(TABLE(t)).
  • De même, au lieu de spécifier TABLE(t) WITH SINGLE PARTITION ORDER BY b dans la requête SQL, vous pouvez faire en sorte que analyze définisse les champs withSinglePartition=true et orderBy=[OrderingColumn("b")], puis passe simplement TABLE(t).
  • Au lieu de passer TABLE(SELECT a FROM t) dans la requête SQL, vous pouvez laissez analyze définir select=[SelectedColumn("a")], puis passez TABLE(t).

Dans l’exemple suivant, analyze retourne un schéma de sortie constant, sélectionne un sous-ensemble de colonnes dans la table d’entrée et spécifie que la table d’entrée est partitionnée sur plusieurs appels UDTF en fonction des valeurs de la colonne date :

@staticmethod
def analyze(*args) -> AnalyzeResult:
  """
  The input table will be partitioned across several UDTF calls based on the monthly
  values of each `date` column. The rows within each partition will arrive ordered by the `date`
  column. The UDTF will only receive the `date` and `word` columns from the input table.
  """
  from pyspark.sql.functions import (
    AnalyzeResult,
    OrderingColumn,
    PartitioningColumn,
  )

  assert len(args) == 1, "This function accepts one argument only"
  assert args[0].isTable, "Only table arguments are supported"
  return AnalyzeResult(
    schema=StructType()
      .add("month", DateType())
      .add('longest_word", IntegerType()),
    partitionBy=[
      PartitioningColumn("extract(month from date)")],
    orderBy=[
      OrderingColumn("date")],
    select=[
      SelectedColumn("date"),
      SelectedColumn(
        name="length(word),
        alias="length_word")])