Partilhar via


Funções definidas pelo usuário no Databricks Connect for Scala

Observação

Este artigo aborda o Databricks Connect for Databricks Runtime 14.1 e superior.

Este artigo descreve como executar funções definidas pelo usuário com o Databricks Connect for Scala. O Databricks Connect permite conectar IDEs populares, servidores de notebook e aplicativos personalizados a clusters do Azure Databricks. Para a versão Python deste artigo, consulte funções definidas pelo usuário no Databricks Connect for Python.

Observação

Antes de começar a usar o Databricks Connect, você deve configurar o cliente Databricks Connect.

Para o Databricks Runtime 14.1 e superior, o Databricks Connect for Scala suporta a execução de funções definidas pelo usuário (UDFs).

Para executar um UDF, a classe compilada e os JARs que o UDF requer devem ser carregados no cluster. A API addCompiledArtifacts() pode ser usada para especificar a classe compilada e os arquivos JAR que devem ser carregados.

Observação

O Scala usado pelo cliente deve corresponder à versão do Scala no cluster do Azure Databricks. Para verificar a versão do Scala do cluster, consulte a seção "Ambiente do Sistema" para a versão do Runtime do Databricks do cluster nas notas de lançamento do Databricks Runtime e a sua compatibilidade.

O programa Scala a seguir configura um UDF simples que quadra valores em uma coluna.

import com.databricks.connect.DatabricksSession
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, udf}

object Main {
  def main(args: Array[String]): Unit = {
    val sourceLocation = getClass.getProtectionDomain.getCodeSource.getLocation.toURI

    val spark = DatabricksSession.builder()
      .addCompiledArtifacts(sourceLocation)
      .getOrCreate()

    def squared(x: Int): Int = x * x

    val squared_udf = udf(squared _)

    spark.range(3)
      .withColumn("squared", squared_udf(col("id")))
      .select("squared")
      .show()
  }
}

No exemplo anterior, como o UDF está totalmente contido em Main, somente o artefato compilado de Main é adicionado. Se o UDF se espalhar por outras classes ou usar bibliotecas externas (ou seja, JARs), todas essas bibliotecas também devem ser incluídas.

Quando a sessão do Spark já estiver inicializada, outras classes compiladas e JARs poderão ser carregados usando a API spark.addArtifact().

Observação

Ao carregar JARs, é necessário incluir todos os JARs de dependência transitiva para o carregamento. As APIs não executam nenhuma deteção automática de dependências transitivas.

APIs de conjunto de dados tipadas

O mesmo mecanismo descrito na seção anterior para UDFs também se aplica a APIs de conjunto de dados tipadas.

As APIs de conjunto de dados tipadas permitem executar transformações como mapa, filtro e agregações em conjuntos de dados resultantes. Eles também são executados de forma semelhante a UDFs no cluster Databricks.

O aplicativo Scala a seguir usa a API map() para modificar um número em uma coluna de resultados para uma cadeia de caracteres prefixada.

import com.databricks.connect.DatabricksSession
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, udf}

object Main {
  def main(args: Array[String]): Unit = {
    val sourceLocation = getClass.getProtectionDomain.getCodeSource.getLocation.toURI

    val spark = DatabricksSession.builder()
      .addCompiledArtifacts(sourceLocation)
      .getOrCreate()

    spark.range(3).map(f => s"row-$f").show()
  }
}

Embora este exemplo use a API map(), isso também se aplica a outras APIs de conjunto de dados tipadas, como filter(), mapPartitions(), etc.