Användardefinierade funktioner i Databricks Connect för Scala
Note
Den här artikeln beskriver Databricks Connect för Databricks Runtime 14.1 och senare.
I den här artikeln beskrivs hur du kör användardefinierade funktioner med Databricks Connect för Scala. Med Databricks Connect kan du ansluta populära IDE:er, notebook-servrar och anpassade program till Azure Databricks-kluster. Python-versionen av den här artikeln finns i Användardefinierade funktioner i Databricks Connect för Python.
Note
Innan du börjar använda Databricks Connect måste du konfigurera Databricks Connect-klienten.
För Databricks Runtime 14.1 och senare har Databricks Connect för Scala stöd för att köra användardefinierade funktioner (UDF:er).
För att kunna köra en UDF måste den kompilerade klassen och JAR:erna som krävs för UDF laddas upp till klustret.
Det addCompiledArtifacts()
API:et kan användas för att ange den kompilerade klassen och JAR-filer som måste laddas upp.
Note
Den Scala som används av klienten måste matcha Scala-versionen i Azure Databricks-klustret. Information om hur du kontrollerar klustrets Scala-version finns i avsnittet "Systemmiljö" för klustrets Databricks Runtime-version i Databricks Runtime release notes och kompatibilitet.
Följande Scala-program konfigurerar en enkel UDF som kvadraterar värden i en kolumn.
import com.databricks.connect.DatabricksSession
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, udf}
object Main {
def main(args: Array[String]): Unit = {
val sourceLocation = getClass.getProtectionDomain.getCodeSource.getLocation.toURI
val spark = DatabricksSession.builder()
.addCompiledArtifacts(sourceLocation)
.getOrCreate()
def squared(x: Int): Int = x * x
val squared_udf = udf(squared _)
spark.range(3)
.withColumn("squared", squared_udf(col("id")))
.select("squared")
.show()
}
}
Eftersom UDF i föregående exempel är helt inneslutet i Main
läggs endast den kompilerade artefakten för Main
till.
Om UDF sprids över andra klasser eller använder externa bibliotek (d.v.s. JAR:er) bör alla dessa bibliotek också inkluderas.
När Spark-sessionen redan har initierats kan ytterligare kompilerade klasser och JAR:er laddas upp med hjälp av spark.addArtifact()
-API:et.
Note
När du laddar upp JAR:er måste alla transitiva beroende-JAR:er inkluderas för uppladdning. API:erna utför ingen automatisk identifiering av transitiva beroenden.
Api:er för typade datauppsättningar
Samma mekanism som beskrivs i föregående avsnitt för UDF:er gäller även för typerade API:er för datauppsättningar.
Med inskrivna API:er för datauppsättningar kan en köra transformeringar som mappning, filter och aggregeringar på resulterande datauppsättningar. Dessa körs också på samma sätt som UDF:er i Databricks-klustret.
Följande Scala-program använder map()
API för att ändra ett tal i en resultatkolumn till en prefixsträng.
import com.databricks.connect.DatabricksSession
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, udf}
object Main {
def main(args: Array[String]): Unit = {
val sourceLocation = getClass.getProtectionDomain.getCodeSource.getLocation.toURI
val spark = DatabricksSession.builder()
.addCompiledArtifacts(sourceLocation)
.getOrCreate()
spark.range(3).map(f => s"row-$f").show()
}
}
I det här exemplet används api:et map()
, men detta gäller även för andra typerade API:er för datauppsättningar, till exempel filter()
, mapPartitions()
osv.