Funções escalares definidas pelo usuário - Python
Este artigo contém exemplos de função definida pelo usuário (UDF) do Python. Ele mostra como registrar UDFs, como invocar UDFs e fornece advertências sobre a ordem de avaliação de subexpressões no Spark SQL.
No Databricks Runtime 14.0 e superior, você pode usar funções de tabela definidas pelo usuário (UDTFs) do Python para registrar funções que retornam relações inteiras em vez de valores escalares. Consulte funções de tabela definidas pelo usuário (UDTFs) do Python.
Nota
No Databricks Runtime 12.2 LTS e versões anteriores, UDFs Python e UDFs Pandas não são suportados na computação do Unity Catalog que usa o modo de acesso partilhado. UDFs Python escalares e UDFs Pandas são suportados no Databricks Runtime 13.3 LTS e acima para todos os modos de acesso.
No Databricks Runtime 13.3 LTS e superior, você pode registrar UDFs Python escalares no Unity Catalog usando sintaxe SQL. Consulte Funções definidas pelo usuário (UDFs) no Unity Catalog.
Registrar uma função como UDF
def squared(s):
return s * s
spark.udf.register("squaredWithPython", squared)
Opcionalmente, você pode definir o tipo de retorno do seu UDF. O tipo de retorno padrão é StringType
.
from pyspark.sql.types import LongType
def squared_typed(s):
return s * s
spark.udf.register("squaredWithPython", squared_typed, LongType())
Chamar o UDF no Spark SQL
spark.range(1, 20).createOrReplaceTempView("test")
%sql select id, squaredWithPython(id) as id_squared from test
Usar UDF com DataFrames
from pyspark.sql.functions import udf
from pyspark.sql.types import LongType
squared_udf = udf(squared, LongType())
df = spark.table("test")
display(df.select("id", squared_udf("id").alias("id_squared")))
Como alternativa, você pode declarar a mesma UDF usando a sintaxe de anotação:
from pyspark.sql.functions import udf
@udf("long")
def squared_udf(s):
return s * s
df = spark.table("test")
display(df.select("id", squared_udf("id").alias("id_squared")))
Ordem de avaliação e verificação nula
O Spark SQL (incluindo SQL e a API DataFrame e Dataset) não garante a ordem de avaliação das subexpressões. Em particular, as entradas de um operador ou função não são necessariamente avaliadas da esquerda para a direita ou em qualquer outra ordem fixa. Por exemplo, a lógica AND
e OR
as expressões não têm semântica de "curto-circuito" da esquerda para a direita.
Portanto, é perigoso confiar nos efeitos colaterais ou na ordem de avaliação das expressões booleanas, e na ordem das e WHERE
cláusulas, uma vez que tais expressões e cláusulas podem ser reordenadas durante a otimização e o planejamento da HAVING
consulta. Especificamente, se um UDF depende de semântica de curto-circuito no SQL para verificação nula, não há garantia de que a verificação nula acontecerá antes de invocar o UDF. Por exemplo,
spark.udf.register("strlen", lambda s: len(s), "int")
spark.sql("select s from test1 where s is not null and strlen(s) > 1") # no guarantee
Esta WHERE
cláusula não garante que o strlen
UDF seja invocado após filtrar nulos.
Para executar a verificação nula adequada, recomendamos que você siga um destes procedimentos:
- Tornar a própria UDF nula e fazer verificação nula dentro da própria UDF
- Use
IF
ouCASE WHEN
expressões para fazer a verificação nula e invocar a UDF em uma ramificação condicional
spark.udf.register("strlen_nullsafe", lambda s: len(s) if not s is None else -1, "int")
spark.sql("select s from test1 where s is not null and strlen_nullsafe(s) > 1") // ok
spark.sql("select s from test1 where if(s is not null, strlen(s), null) > 1") // ok
Limitações
As seguintes limitações se aplicam aos UDFs do PySpark:
Restrições de importação de módulos: UDFs do PySpark em clusters compartilhados e computação sem servidor não podem acessar pastas Git, arquivos de espaço de trabalho ou volumes do catálogo Unity para importar módulos no Databricks Runtime 14.2 e inferior.
Variáveis de difusão: UDFs do PySpark em clusters compartilhados e computação sem servidor não suportam variáveis de transmissão.
Limite de memória: UDFs PySpark em computação sem servidor têm um limite de memória de 1GB por UDF PySpark. Exceder esse limite resulta no seguinte erro:
[UDF_PYSPARK_ERROR.OOM] Python worker exited unexpectedly (crashed) due to running out of memory.