Fonctions définies par l’utilisateur dans Databricks Connect pour Scala
Note
Cet article traite de Databricks Connect pour Databricks Runtime 14.1 et versions ultérieures.
Cet article explique comment exécuter des fonctions définies par l’utilisateur avec Databricks Connect pour Scala. Databricks Connect vous permet de connecter des IDE, des serveurs de notebooks et des applications personnalisées populaires aux clusters Azure Databricks. Pour obtenir la version Python de cet article, consultez fonctions définies par l’utilisateur dans Databricks Connect pour Python.
Note
Avant de commencer à utiliser Databricks Connect, vous devez set le client Databricks Connect.
Pour Databricks Runtime 14.1 et versions ultérieures, Databricks Connect pour Scala prend en charge l’exécution de fonctions définies par l’utilisateur (UDF).
Pour exécuter une fonction UDF, la classe compilée et les fichiers JAR requis par l’UDF doivent être chargés sur le cluster.
L’API addCompiledArtifacts()
peut être utilisée pour spécifier la classe compilée et les fichiers JAR qui doivent être chargés.
Note
La scala utilisée par le client doit correspondre à la version Scala sur le cluster Azure Databricks. Pour vérifier la version de Scala du cluster, consultez la section « Environnement système » de la version Databricks Runtime du cluster dans Notes de publication sur les versions et la compatibilité de Databricks Runtime.
Le programme Scala suivant configure une fonction UDF simple qui place les values dans une column.
import com.databricks.connect.DatabricksSession
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, udf}
object Main {
def main(args: Array[String]): Unit = {
val sourceLocation = getClass.getProtectionDomain.getCodeSource.getLocation.toURI
val spark = DatabricksSession.builder()
.addCompiledArtifacts(sourceLocation)
.getOrCreate()
def squared(x: Int): Int = x * x
val squared_udf = udf(squared _)
spark.range(3)
.withColumn("squared", squared_udf(col("id")))
.select("squared")
.show()
}
}
Dans l’exemple précédent, parce que la fonction UDF est entièrement contenue dans Main
, seul l’artefact compilé de Main
est ajouté.
Si la fonction UDF s’étend sur d’autres classes ou utilise des bibliothèques externes (c’est-à-dire, jars), toutes ces bibliothèques doivent également être incluses.
Lorsque la session Spark est déjà initialisée, des classes compilées et des fichiers JAR supplémentaires peuvent être chargés à l’aide de l’API spark.addArtifact()
.
Remarque
Lors de l'envoi de JARs, tous les JARs de dépendance transitive doivent être inclus dans le téléchargement. Les API n’effectuent aucune détection automatique des dépendances transitives.
API de jeux de données typés
Le même mécanisme décrit dans la section précédente pour les UDF s’applique également aux API de jeux de données typés.
Les API de jeu de données typées permettent d’exécuter des transformations telles que des mappages, des filtres et des agrégations sur les jeux de données résultants. Elles sont également exécutées de la même façon que les fonctions UDF sur le cluster Databricks.
L’application Scala suivante utilise l’API map()
pour modifier un nombre dans un résultat column à une chaîne préfixée.
import com.databricks.connect.DatabricksSession
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, udf}
object Main {
def main(args: Array[String]): Unit = {
val sourceLocation = getClass.getProtectionDomain.getCodeSource.getLocation.toURI
val spark = DatabricksSession.builder()
.addCompiledArtifacts(sourceLocation)
.getOrCreate()
spark.range(3).map(f => s"row-$f").show()
}
}
Bien que cet exemple utilise l’API map()
, cela s’applique également à d’autres API de jeu de données typées telles que filter()
, mapPartitions()
, etc.