Partilhar via


Funções definidas pelo usuário no Databricks Connect for Scala

Nota

Este artigo aborda o Databricks Connect for Databricks Runtime 14.1 e superior.

Este artigo descreve como executar funções definidas pelo usuário com o Databricks Connect for Scala. O Databricks Connect permite conectar IDEs populares, servidores de notebook e aplicativos personalizados a clusters do Azure Databricks. Para a versão Python deste artigo, consulte Funções definidas pelo usuário no Databricks Connect for Python.

Nota

Antes de começar a usar o Databricks Connect, você deve configurar o cliente Databricks Connect.

Para o Databricks Runtime 14.1 e superior, o Databricks Connect for Scala suporta a execução de funções definidas pelo usuário (UDFs).

Para executar um UDF, a classe compilada e os JARs que o UDF requer devem ser carregados no cluster. A addCompiledArtifacts() API pode ser usada para especificar a classe compilada e os arquivos JAR que devem ser carregados.

Nota

O Scala usado pelo cliente deve corresponder à versão do Scala no cluster do Azure Databricks. Para verificar a versão do Scala do cluster, consulte a seção "Ambiente do sistema" para obter a versão do Databricks Runtime do cluster em Versões e compatibilidade das notas de versão do Databricks Runtime.

O programa Scala a seguir configura um UDF simples que quadra valores em uma coluna.

import com.databricks.connect.DatabricksSession
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, udf}

object Main {
  def main(args: Array[String]): Unit = {
    val sourceLocation = getClass.getProtectionDomain.getCodeSource.getLocation.toURI

    val spark = DatabricksSession.builder()
      .addCompiledArtifacts(sourceLocation)
      .getOrCreate()

    def squared(x: Int): Int = x * x

    val squared_udf = udf(squared _)

    spark.range(3)
      .withColumn("squared", squared_udf(col("id")))
      .select("squared")
      .show()
  }
}

No exemplo anterior, como o UDF está totalmente contido no Main, somente o artefato compilado de é adicionado Main . Se o UDF se espalhar por outras classes ou usar bibliotecas externas (ou seja, JARs), todas essas bibliotecas também devem ser incluídas.

Quando a sessão do Spark já estiver inicializada, outras classes compiladas e JARs poderão ser carregados usando a spark.addArtifact() API.

Nota

Ao carregar JARs, todos os JARs de dependência transitiva devem ser incluídos para upload. As APIs não executam nenhuma deteção automática de dependências transitivas.

APIs de conjunto de dados tipadas

O mesmo mecanismo descrito na seção anterior para UDFs também se aplica a APIs de conjunto de dados tipadas.

As APIs de conjunto de dados tipadas permitem executar transformações como mapa, filtro e agregações em conjuntos de dados resultantes. Eles também são executados de forma semelhante a UDFs no cluster Databricks.

O seguinte aplicativo Scala usa a map() API para modificar um número em uma coluna de resultados para uma cadeia de caracteres prefixada.

import com.databricks.connect.DatabricksSession
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, udf}

object Main {
  def main(args: Array[String]): Unit = {
    val sourceLocation = getClass.getProtectionDomain.getCodeSource.getLocation.toURI

    val spark = DatabricksSession.builder()
      .addCompiledArtifacts(sourceLocation)
      .getOrCreate()

    spark.range(3).map(f => s"row-$f").show()
  }
}

Embora este exemplo use a API, isso também se aplica a map() outras APIs de conjunto de dados tipadas, como filter(), , mapPartitions()etc.