Funções definidas pelo usuário no Databricks Connect for Scala
Observação
Este artigo aborda o Databricks Connect for Databricks Runtime 14.1 e superior.
Este artigo descreve como executar funções definidas pelo usuário com o Databricks Connect for Scala. O Databricks Connect permite conectar IDEs populares, servidores de notebook e aplicativos personalizados a clusters do Azure Databricks. Para a versão Python deste artigo, consulte funções definidas pelo usuário no Databricks Connect for Python.
Observação
Antes de começar a usar o Databricks Connect, você deve configurar o cliente Databricks Connect.
Para o Databricks Runtime 14.1 e superior, o Databricks Connect for Scala suporta a execução de funções definidas pelo usuário (UDFs).
Para executar um UDF, a classe compilada e os JARs que o UDF requer devem ser carregados no cluster.
A API addCompiledArtifacts()
pode ser usada para especificar a classe compilada e os arquivos JAR que devem ser carregados.
Observação
O Scala usado pelo cliente deve corresponder à versão do Scala no cluster do Azure Databricks. Para verificar a versão do Scala do cluster, consulte a seção "Ambiente do Sistema" para a versão do Runtime do Databricks do cluster nas notas de lançamento do Databricks Runtime e a sua compatibilidade.
O programa Scala a seguir configura um UDF simples que quadra valores em uma coluna.
import com.databricks.connect.DatabricksSession
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, udf}
object Main {
def main(args: Array[String]): Unit = {
val sourceLocation = getClass.getProtectionDomain.getCodeSource.getLocation.toURI
val spark = DatabricksSession.builder()
.addCompiledArtifacts(sourceLocation)
.getOrCreate()
def squared(x: Int): Int = x * x
val squared_udf = udf(squared _)
spark.range(3)
.withColumn("squared", squared_udf(col("id")))
.select("squared")
.show()
}
}
No exemplo anterior, como o UDF está totalmente contido em Main
, somente o artefato compilado de Main
é adicionado.
Se o UDF se espalhar por outras classes ou usar bibliotecas externas (ou seja, JARs), todas essas bibliotecas também devem ser incluídas.
Quando a sessão do Spark já estiver inicializada, outras classes compiladas e JARs poderão ser carregados usando a API spark.addArtifact()
.
Observação
Ao carregar JARs, é necessário incluir todos os JARs de dependência transitiva para o carregamento. As APIs não executam nenhuma deteção automática de dependências transitivas.
APIs de conjunto de dados tipadas
O mesmo mecanismo descrito na seção anterior para UDFs também se aplica a APIs de conjunto de dados tipadas.
As APIs de conjunto de dados tipadas permitem executar transformações como mapa, filtro e agregações em conjuntos de dados resultantes. Eles também são executados de forma semelhante a UDFs no cluster Databricks.
O aplicativo Scala a seguir usa a API map()
para modificar um número em uma coluna de resultados para uma cadeia de caracteres prefixada.
import com.databricks.connect.DatabricksSession
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, udf}
object Main {
def main(args: Array[String]): Unit = {
val sourceLocation = getClass.getProtectionDomain.getCodeSource.getLocation.toURI
val spark = DatabricksSession.builder()
.addCompiledArtifacts(sourceLocation)
.getOrCreate()
spark.range(3).map(f => s"row-$f").show()
}
}
Embora este exemplo use a API map()
, isso também se aplica a outras APIs de conjunto de dados tipadas, como filter()
, mapPartitions()
, etc.