Funções definidas pelo usuário no Databricks Connect para Scala
Observação
Este artigo aborda o Databricks Connect para Databricks Runtime 14.1 e versões superiores.
Este artigo descreve como usar executar funções definidas pelo usuário com o Databricks Connect para Scala. O Databricks Connect permite que você conecte IDEs, servidores de notebook populares e aplicativos personalizados aos clusters do Azure Databricks. Para ler a versão do Python deste artigo, confira Funções definidas pelo usuário com o Databricks Connect para Python.
Observação
Antes de começar a utilizar o Databricks Connect, você precisa configurar o cliente do Databricks Connect.
No caso do Databricks Runtime 14.1 e versões superiores, o Databricks Connect para Scala dá suporte à execução de UDFs (funções definidas pelo usuário).
Para executar uma UDF, a classe compilada e os JARs necessários para a UDF precisam ser carregados no cluster.
A API addCompiledArtifacts()
pode ser usada para especificar a classe compilada e os arquivos JAR que precisam ser carregados.
Observação
O Scala usado pelo cliente precisa corresponder à versão do Scala no cluster do Azure Databricks. Para verificar a versão do Scala do cluster, confira a seção “Ambiente do sistema” da versão do Databricks Runtime do cluster em Versões e compatibilidade das notas sobre a versão do Databricks Runtime.
O programa Scala a seguir configura uma UDF simples que eleva ao quadrado os valores em uma coluna.
import com.databricks.connect.DatabricksSession
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, udf}
object Main {
def main(args: Array[String]): Unit = {
val sourceLocation = getClass.getProtectionDomain.getCodeSource.getLocation.toURI
val spark = DatabricksSession.builder()
.addCompiledArtifacts(sourceLocation)
.getOrCreate()
def squared(x: Int): Int = x * x
val squared_udf = udf(squared _)
spark.range(3)
.withColumn("squared", squared_udf(col("id")))
.select("squared")
.show()
}
}
No exemplo anterior, como a UDF está totalmente contida em Main
, somente o artefato compilado de Main
é adicionado.
Se a UDF for distribuída por outras classes ou usar bibliotecas externas (ou seja, JARs), todas essas bibliotecas também deverão ser incluídas.
Quando a sessão do Spark já estiver inicializada, outras classes compiladas e os JARs poderão ser carregados por meio da API spark.addArtifact()
.
Observação
Ao carregar JARs, todos os JARs de dependência transitiva precisam ser incluídos para upload. As APIs não executam nenhuma detecção automática de dependências transitivas.
APIs do conjunto de dados tipado
O mesmo mecanismo descrito na seção anterior para UDFs também se aplica às APIs do conjunto de dados tipado.
As APIs do conjunto de dados tipado permitem executar transformações como mapa, filtro e agregações nos conjuntos de dados resultantes. Elas também são executadas de forma semelhante às UDFs no cluster do Databricks.
O aplicativo Scala a seguir usa a API map()
para modificar um número em uma coluna de resultado para uma cadeia de caracteres prefixada.
import com.databricks.connect.DatabricksSession
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, udf}
object Main {
def main(args: Array[String]): Unit = {
val sourceLocation = getClass.getProtectionDomain.getCodeSource.getLocation.toURI
val spark = DatabricksSession.builder()
.addCompiledArtifacts(sourceLocation)
.getOrCreate()
spark.range(3).map(f => s"row-$f").show()
}
}
Embora este exemplo use a API map()
, ele também se aplica a outras APIs do conjunto de dados tipado, como filter()
, mapPartitions()
etc.