Partilhar via


Funções de tabela definidas pelo usuário Python (UDTFs)

Importante

Esse recurso está no Public Preview no Databricks Runtime 14.3 LTS e superior.

Uma função de tabela definida pelo usuário (UDTF) permite registrar funções que retornam tabelas em vez de valores escalares. Ao contrário das funções escalares que retornam um único valor de resultado de cada chamada, cada UDTF é invocado na cláusula de FROM uma instrução SQL e retorna uma tabela inteira como saída.

Cada chamada UDTF pode aceitar zero ou mais argumentos. Esses argumentos podem ser expressões escalares ou argumentos de tabela que representam tabelas de entrada inteiras.

Sintaxe UDTF básica

O Apache Spark implementa UDTFs Python como classes Python com um método obrigatório eval que usa yield para emitir linhas de saída.

Para usar sua classe como UDTF, você deve importar a função PySpark udtf . Databricks recomenda usar essa função como decorador e especificar explicitamente nomes e tipos de campo usando a returnType opção (a menos que a classe defina um analyze método conforme descrito em uma seção posterior).

O UDTF a seguir cria uma tabela usando uma lista fixa de dois argumentos inteiros:

from pyspark.sql.functions import lit, udtf

@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
    def eval(self, x: int, y: int):
        yield x + y, x - y

GetSumDiff(lit(1), lit(2)).show()
+----+-----+
| sum| diff|
+----+-----+
|   3|   -1|
+----+-----+

Registar uma UDTF

As UDTFs são registradas no local SparkSession e são isoladas no nível do bloco de anotações ou do trabalho.

Não é possível registrar UDTFs como objetos no Unity Catalog, e UDTFs não podem ser usados com armazéns SQL.

Você pode registrar um UDTF para o atual SparkSession para uso em consultas SQL com a função spark.udtf.register(). Forneça um nome para a função SQL e a classe UDTF do Python.

spark.udtf.register("get_sum_diff", GetSumDiff)

Chamar um UDTF registrado

Uma vez registrado, você pode usar o UDTF em SQL usando o comando magic ou spark.sql() a %sql função:

spark.udtf.register("get_sum_diff", GetSumDiff)
spark.sql("SELECT * FROM get_sum_diff(1,2);")
%sql
SELECT * FROM get_sum_diff(1,2);

Usar seta Apache

Se o seu UDTF recebe uma pequena quantidade de dados como entrada, mas produz uma tabela grande, o Databricks recomenda o uso da Seta Apache. Você pode habilitá-lo especificando o useArrow parâmetro ao declarar o UDTF:

@udtf(returnType="c1: int, c2: int", useArrow=True)

Listas de argumentos variáveis - *args e **kwargs

Você pode usar Python *args ou **kwargs sintaxe e implementar lógica para manipular um número não especificado de valores de entrada.

O exemplo a seguir retorna o mesmo resultado enquanto verifica explicitamente o comprimento e os tipos de entrada para os argumentos:

@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
    def eval(self, *args):
        assert(len(args) == 2)
        assert(isinstance(arg, int) for arg in args)
        x = args[0]
        y = args[1]
        yield x + y, x - y

GetSumDiff(lit(1), lit(2)).show()

Aqui está o mesmo exemplo, mas usando argumentos de palavra-chave:

@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
    def eval(self, **kwargs):
        x = kwargs["x"]
        y = kwargs["y"]
        yield x + y, x - y

GetSumDiff(x=lit(1), y=lit(2)).show()

Definir um esquema estático no momento do registro

O UDTF retorna linhas com um esquema de saída que compreende uma sequência ordenada de nomes e tipos de coluna. Se o esquema UDTF deve permanecer sempre o mesmo para todas as consultas, você pode especificar um esquema estático e fixo após o @udtf decorador. Deve ser um StructType:

StructType().add("c1", StringType())

Ou uma cadeia de caracteres DDL que representa um tipo struct:

c1: string

Calcular um esquema dinâmico no momento da chamada da função

UDTFs também podem calcular o esquema de saída programaticamente para cada chamada, dependendo dos valores dos argumentos de entrada. Para fazer isso, defina um método estático chamado analyze que aceite zero ou mais parâmetros que correspondam aos argumentos fornecidos para a chamada UDTF específica.

Cada argumento do analyze método é uma instância da AnalyzeArgument classe que contém os seguintes campos:

AnalyzeArgument Campo de classe Description
dataType O tipo do argumento de entrada como um DataTypearquivo . Para argumentos de StructType tabela de entrada, isso representa as colunas da tabela.
value O valor do argumento de entrada como um Optional[Any]arquivo . Isso é None para argumentos de tabela ou argumentos escalares literais que não são constantes.
isTable Se o argumento de entrada é uma tabela como um BooleanTypearquivo .
isConstantExpression Se o argumento input é uma expressão dobrável constante como um BooleanTypearquivo .

O analyze método retorna uma instância da classe, que inclui o esquema da tabela de AnalyzeResult resultados como um StructType mais alguns campos opcionais. Se o UDTF aceitar um argumento de tabela de entrada, o AnalyzeResult também pode incluir uma maneira solicitada de particionar e ordenar as linhas da tabela de entrada em várias chamadas UDTF, conforme descrito mais adiante.

AnalyzeResult Campo de classe Description
schema O esquema da tabela de resultados como um StructTypearquivo .
withSinglePartition Se todas as linhas de entrada devem ser enviadas para a mesma instância de classe UDTF como um BooleanTypearquivo .
partitionBy Se definido como não vazio, todas as linhas com cada combinação exclusiva de valores das expressões de particionamento são consumidas por uma instância separada da classe UDTF.
orderBy Se definido como não vazio, isso especifica uma ordem de linhas dentro de cada partição.
select Se definido como não vazio, esta é uma sequência de expressões que o UDTF está especificando para o Catalyst avaliar em relação às colunas no argumento TABLE de entrada. O UDTF recebe um atributo de entrada para cada nome na lista na ordem em que são listados.

Este analyze exemplo retorna uma coluna de saída para cada palavra no argumento input string.

@udtf
class MyUDTF:
  @staticmethod
  def analyze(text: AnalyzeArgument) -> AnalyzeResult:
    schema = StructType()
    for index, word in enumerate(sorted(list(set(text.value.split(" "))))):
      schema = schema.add(f"word_{index}", IntegerType())
    return AnalyzeResult(schema=schema)

  def eval(self, text: str):
    counts = {}
    for word in text.split(" "):
      if word not in counts:
            counts[word] = 0
      counts[word] += 1
    result = []
    for word in sorted(list(set(text.split(" ")))):
      result.append(counts[word])
    yield result
['word_0', 'word_1']

Encaminhar estado para chamadas futuras eval

O analyze método pode servir como um local conveniente para executar a inicialização e, em seguida, encaminhar os resultados para invocações de método futuras eval para a mesma chamada UDTF.

Para fazer isso, crie uma subclasse de AnalyzeResult e retorne uma instância da subclasse do analyze método. Em seguida, adicione um argumento adicional ao __init__ método para aceitar essa instância.

Este analyze exemplo retorna um esquema de saída constante, mas adiciona informações personalizadas nos metadados de resultado a serem consumidos por chamadas de método futuras __init__ :

@dataclass
class AnalyzeResultWithBuffer(AnalyzeResult):
    buffer: str = ""

@udtf
class TestUDTF:
  def __init__(self, analyze_result=None):
    self._total = 0
    if analyze_result is not None:
      self._buffer = analyze_result.buffer
    else:
      self._buffer = ""

  @staticmethod
  def analyze(argument, _) -> AnalyzeResult:
    if (
      argument.value is None
      or argument.isTable
      or not isinstance(argument.value, str)
      or len(argument.value) == 0
    ):
      raise Exception("The first argument must be a non-empty string")
    assert argument.dataType == StringType()
    assert not argument.isTable
    return AnalyzeResultWithBuffer(
      schema=StructType()
        .add("total", IntegerType())
        .add("buffer", StringType()),
      withSinglePartition=True,
      buffer=argument.value,
    )

  def eval(self, argument, row: Row):
    self._total += 1

  def terminate(self):
    yield self._total, self._buffer

self.spark.udtf.register("test_udtf", TestUDTF)

spark.sql(
  """
  WITH t AS (
    SELECT id FROM range(1, 21)
  )
  SELECT total, buffer
  FROM test_udtf("abc", TABLE(t))
  """
).show()
+-------+-------+
| count | buffer|
+-------+-------+
|    20 |  "abc"|
+-------+-------+

Linhas de saída de rendimento

O eval método é executado uma vez para cada linha do argumento da tabela de entrada (ou apenas uma vez se nenhum argumento de tabela for fornecido), seguido por uma invocação do terminate método no final. Qualquer método produz zero ou mais linhas que estão em conformidade com o esquema de resultados produzindo tuplas, listas ou pyspark.sql.Row objetos.

Este exemplo retorna uma linha fornecendo uma tupla de três elementos:

def eval(self, x, y, z):
  yield (x, y, z)

Você também pode omitir os parênteses:

def eval(self, x, y, z):
  yield x, y, z

Adicione uma vírgula à direita para retornar uma linha com apenas uma coluna:

def eval(self, x, y, z):
  yield x,

Você também pode produzir um pyspark.sql.Row objeto.

def eval(self, x, y, z)
  from pyspark.sql.types import Row
  yield Row(x, y, z)

Este exemplo produz linhas de saída do terminate método usando uma lista Python. Você pode armazenar o estado dentro da classe de etapas anteriores na avaliação UDTF para essa finalidade.

def terminate(self):
  yield [self.x, self.y, self.z]

Passar argumentos escalares para uma UDTF

Você pode passar argumentos escalares para um UDTF como expressões constantes que compreendem valores literais ou funções baseadas neles. Por exemplo:

SELECT * FROM udtf(42, group => upper("finance_department"));

Passar argumentos de tabela para um UDTF

UDTFs Python podem aceitar uma tabela de entrada como um argumento, além de argumentos de entrada escalares. Um único UDTF também pode aceitar um argumento de tabela e vários argumentos escalares.

Em seguida, qualquer consulta SQL pode fornecer uma tabela de entrada usando a TABLE palavra-chave seguida de parênteses ao redor de um identificador de tabela apropriado, como TABLE(t). Como alternativa, você pode passar uma subconsulta de tabela, como TABLE(SELECT a, b, c FROM t) ou TABLE(SELECT t1.a, t2.b FROM t1 INNER JOIN t2 USING (key)).

O argumento da tabela de entrada é então representado como um pyspark.sql.Row argumento para o eval método, com uma chamada para o eval método para cada linha na tabela de entrada. Você pode usar anotações de campo de coluna padrão do PySpark para interagir com colunas em cada linha. O exemplo a seguir demonstra explicitamente importar o tipo PySpark Row e, em seguida, filtrar a tabela passada no id campo:

from pyspark.sql.functions import udtf
from pyspark.sql.types import Row

@udtf(returnType="id: int")
class FilterUDTF:
    def eval(self, row: Row):
        if row["id"] > 5:
            yield row["id"],

spark.udtf.register("filter_udtf", FilterUDTF)

Para consultar a função, use a TABLE palavra-chave SQL:

SELECT * FROM filter_udtf(TABLE(SELECT * FROM range(10)));
+---+
| id|
+---+
|  6|
|  7|
|  8|
|  9|
+---+

Especificar um particionamento das linhas de entrada de chamadas de função

Ao chamar um UDTF com um argumento de tabela, qualquer consulta SQL pode particionar a tabela de entrada em várias chamadas UDTF com base nos valores de uma ou mais colunas da tabela de entrada.

Para especificar uma partição, use a PARTITION BY cláusula na chamada de função após o TABLE argumento. Isso garante que todas as linhas de entrada com cada combinação exclusiva de valores das colunas de particionamento serão consumidas por exatamente uma instância da classe UDTF.

Observe que, além de referências de coluna simples, a PARTITION BY cláusula também aceita expressões arbitrárias baseadas em colunas da tabela de entrada. Por exemplo, você pode especificar o LENGTH de uma cadeia de caracteres, extrair um mês de uma data ou concatenar dois valores.

Também é possível especificar WITH SINGLE PARTITION em vez de PARTITION BY solicitar apenas uma partição em que todas as linhas de entrada devem ser consumidas por exatamente uma instância da classe UDTF.

Dentro de cada partição, você pode, opcionalmente, especificar uma ordenação necessária das linhas de entrada à medida que eval o método UDTF as consome. Para isso, forneça uma ORDER BY cláusula após a PARTITION BY cláusula descrita WITH SINGLE PARTITION acima.

Por exemplo, considere o seguinte UDTF:

from pyspark.sql.functions import udtf
from pyspark.sql.types import Row

@udtf(returnType="a: string, b: int")
class FilterUDTF:
  def __init__(self):
    self.key = ""
    self.max = 0

  def eval(self, row: Row):
    self.key = row["a"]
    self.max = max(self.max, row["b"])

  def terminate(self):
    yield self.key, self.max

spark.udtf.register("filter_udtf", FilterUDTF)

Você pode especificar opções de particionamento ao chamar o UDTF sobre a tabela de entrada de várias maneiras:

-- Create an input table with some example values.
DROP TABLE IF EXISTS values_table;
CREATE TABLE values_table (a STRING, b INT);
INSERT INTO values_table VALUES ('abc', 2), ('abc', 4), ('def', 6), ('def', 8)";
SELECT * FROM values_table;
+-------+----+
|     a |  b |
+-------+----+
| "abc" | 2  |
| "abc" | 4  |
| "def" | 6  |
| "def" | 8  |
+-------+----+
-- Query the UDTF with the input table as an argument and a directive to partition the input
-- rows such that all rows with each unique value in the `a` column are processed by the same
-- instance of the UDTF class. Within each partition, the rows are ordered by the `b` column.
SELECT * FROM filter_udtf(TABLE(values_table) PARTITION BY a ORDER BY b) ORDER BY 1;
+-------+----+
|     a |  b |
+-------+----+
| "abc" | 4  |
| "def" | 8  |
+-------+----+

-- Query the UDTF with the input table as an argument and a directive to partition the input
-- rows such that all rows with each unique result of evaluating the "LENGTH(a)" expression are
-- processed by the same instance of the UDTF class. Within each partition, the rows are ordered
-- by the `b` column.
SELECT * FROM filter_udtf(TABLE(values_table) PARTITION BY LENGTH(a) ORDER BY b) ORDER BY 1;
+-------+---+
|     a | b |
+-------+---+
| "def" | 8 |
+-------+---+
-- Query the UDTF with the input table as an argument and a directive to consider all the input
-- rows in one single partition such that exactly one instance of the UDTF class consumes all of
-- the input rows. Within each partition, the rows are ordered by the `b` column.
SELECT * FROM filter_udtf(TABLE(values_table) WITH SINGLE PARTITION ORDER BY b) ORDER BY 1;
+-------+----+
|     a |  b |
+-------+----+
| "def" | 8 |
+-------+----+

Especificar um particionamento das linhas de entrada do analyze método

Observe que para cada uma das maneiras acima de particionar a tabela de entrada ao chamar UDTFs em consultas SQL, há uma maneira correspondente para o método UDTF analyze especificar o mesmo método de particionamento automaticamente.

  • Em vez de chamar um UDTF como SELECT * FROM udtf(TABLE(t) PARTITION BY a), você pode atualizar o analyze método para definir o campo partitionBy=[PartitioningColumn("a")] e simplesmente chamar a função usando SELECT * FROM udtf(TABLE(t)).
  • Da mesma forma, em vez de especificar TABLE(t) WITH SINGLE PARTITION ORDER BY b na consulta SQL, você pode fazer analyze definir os campos withSinglePartition=true e orderBy=[OrderingColumn("b")] , em seguida, apenas passar TABLE(t).
  • Em vez de passar TABLE(SELECT a FROM t) a consulta SQL, você pode fazer analyze set select=[SelectedColumn("a")] e, em seguida, apenas passar TABLE(t).

No exemplo a seguir, analyze retorna um esquema de saída constante, seleciona um subconjunto de colunas da tabela de entrada e especifica que a tabela de entrada é particionada em várias chamadas UDTF com base nos valores da date coluna:

@staticmethod
def analyze(*args) -> AnalyzeResult:
  """
  The input table will be partitioned across several UDTF calls based on the monthly
  values of each `date` column. The rows within each partition will arrive ordered by the `date`
  column. The UDTF will only receive the `date` and `word` columns from the input table.
  """
  from pyspark.sql.functions import (
    AnalyzeResult,
    OrderingColumn,
    PartitioningColumn,
  )

  assert len(args) == 1, "This function accepts one argument only"
  assert args[0].isTable, "Only table arguments are supported"
  return AnalyzeResult(
    schema=StructType()
      .add("month", DateType())
      .add('longest_word", IntegerType()),
    partitionBy=[
      PartitioningColumn("extract(month from date)")],
    orderBy=[
      OrderingColumn("date")],
    select=[
      SelectedColumn("date"),
      SelectedColumn(
        name="length(word),
        alias="length_word")])