Dela via


Användardefinierade funktioner i Databricks Connect för Scala

Note

Den här artikeln beskriver Databricks Connect för Databricks Runtime 14.1 och senare.

I den här artikeln beskrivs hur du kör användardefinierade funktioner med Databricks Connect för Scala. Med Databricks Connect kan du ansluta populära IDE:er, notebook-servrar och anpassade program till Azure Databricks-kluster. Python-versionen av den här artikeln finns i Användardefinierade funktioner i Databricks Connect för Python.

Note

Innan du börjar använda Databricks Connect måste du konfigurera Databricks Connect-klienten.

För Databricks Runtime 14.1 och senare har Databricks Connect för Scala stöd för att köra användardefinierade funktioner (UDF:er).

För att kunna köra en UDF måste den kompilerade klassen och JAR:erna som krävs för UDF laddas upp till klustret. Det addCompiledArtifacts() API:et kan användas för att ange den kompilerade klassen och JAR-filer som måste laddas upp.

Note

Den Scala som används av klienten måste matcha Scala-versionen i Azure Databricks-klustret. Information om hur du kontrollerar klustrets Scala-version finns i avsnittet "Systemmiljö" för klustrets Databricks Runtime-version i Databricks Runtime release notes och kompatibilitet.

Följande Scala-program konfigurerar en enkel UDF som kvadraterar värden i en kolumn.

import com.databricks.connect.DatabricksSession
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, udf}

object Main {
  def main(args: Array[String]): Unit = {
    val sourceLocation = getClass.getProtectionDomain.getCodeSource.getLocation.toURI

    val spark = DatabricksSession.builder()
      .addCompiledArtifacts(sourceLocation)
      .getOrCreate()

    def squared(x: Int): Int = x * x

    val squared_udf = udf(squared _)

    spark.range(3)
      .withColumn("squared", squared_udf(col("id")))
      .select("squared")
      .show()
  }
}

Eftersom UDF i föregående exempel är helt inneslutet i Mainläggs endast den kompilerade artefakten för Main till. Om UDF sprids över andra klasser eller använder externa bibliotek (d.v.s. JAR:er) bör alla dessa bibliotek också inkluderas.

När Spark-sessionen redan har initierats kan ytterligare kompilerade klasser och JAR:er laddas upp med hjälp av spark.addArtifact()-API:et.

Note

När du laddar upp JAR:er måste alla transitiva beroende-JAR:er inkluderas för uppladdning. API:erna utför ingen automatisk identifiering av transitiva beroenden.

Api:er för typade datauppsättningar

Samma mekanism som beskrivs i föregående avsnitt för UDF:er gäller även för typerade API:er för datauppsättningar.

Med inskrivna API:er för datauppsättningar kan en köra transformeringar som mappning, filter och aggregeringar på resulterande datauppsättningar. Dessa körs också på samma sätt som UDF:er i Databricks-klustret.

Följande Scala-program använder map() API för att ändra ett tal i en resultatkolumn till en prefixsträng.

import com.databricks.connect.DatabricksSession
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, udf}

object Main {
  def main(args: Array[String]): Unit = {
    val sourceLocation = getClass.getProtectionDomain.getCodeSource.getLocation.toURI

    val spark = DatabricksSession.builder()
      .addCompiledArtifacts(sourceLocation)
      .getOrCreate()

    spark.range(3).map(f => s"row-$f").show()
  }
}

I det här exemplet används api:et map(), men detta gäller även för andra typerade API:er för datauppsättningar, till exempel filter(), mapPartitions()osv.