Door de gebruiker gedefinieerde functies in Databricks Verbinding maken voor Scala
Notitie
Dit artikel bevat informatie over Databricks Verbinding maken voor Databricks Runtime 14.1 en hoger.
In dit artikel wordt beschreven hoe u door de gebruiker gedefinieerde functies uitvoert met Databricks Verbinding maken voor Scala. Met Databricks Verbinding maken kunt u populaire IDE's, notebookservers en aangepaste toepassingen verbinden met Azure Databricks-clusters. Zie voor de Python-versie van dit artikel door de gebruiker gedefinieerde functies in Databricks Verbinding maken voor Python.
Notitie
Voordat u Databricks Verbinding maken gaat gebruiken, moet u de Databricks Verbinding maken-client instellen.
Voor Databricks Runtime 14.1 en hoger biedt Databricks Verbinding maken voor Scala ondersteuning voor het uitvoeren van door de gebruiker gedefinieerde functies (UDF's).
Als u een UDF wilt uitvoeren, moeten de gecompileerde klasse en JAR's die de UDF vereist, worden geüpload naar het cluster.
De addCompiledArtifacts()
API kan worden gebruikt om de gecompileerde klasse- en JAR-bestanden op te geven die moeten worden geüpload.
Notitie
De Scala die door de client wordt gebruikt, moet overeenkomen met de Scala-versie in het Azure Databricks-cluster. Als u de Scala-versie van het cluster wilt controleren, raadpleegt u de sectie 'Systeemomgeving' voor de Databricks Runtime-versie van het cluster in de releaseversie van Databricks Runtime.
Met het volgende Scala-programma stelt u een eenvoudige UDF in die waarden in een kolom kwadrateert.
import com.databricks.connect.DatabricksSession
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, udf}
object Main {
def main(args: Array[String]): Unit = {
val sourceLocation = getClass.getProtectionDomain.getCodeSource.getLocation.toURI
val spark = DatabricksSession.builder()
.addCompiledArtifacts(sourceLocation)
.getOrCreate()
def squared(x: Int): Int = x * x
val squared_udf = udf(squared _)
spark.range(3)
.withColumn("squared", squared_udf(col("id")))
.select("squared")
.show()
}
}
In het voorgaande voorbeeld, omdat de UDF volledig is opgenomenMain
, wordt alleen het gecompileerde artefact toegevoegd.Main
Als de UDF zich verspreidt over andere klassen of gebruikmaakt van externe bibliotheken (d.w.z. JAR's), moeten al deze bibliotheken ook worden opgenomen.
Wanneer de Spark-sessie al is geïnitialiseerd, kunnen verder gecompileerde klassen en JAR's worden geüpload met behulp van de spark.addArtifact()
API.
Notitie
Bij het uploaden van JAR's moeten alle transitieve afhankelijkheids-JAR's worden opgenomen voor upload. De API's voeren geen automatische detectie van transitieve afhankelijkheden uit.
Getypte gegevensset-API's
Hetzelfde mechanisme dat in de vorige sectie voor UDF's wordt beschreven, is ook van toepassing op getypte gegevensset-API's.
Met getypte gegevensset-API's kunt u transformaties uitvoeren, zoals toewijzing, filter en aggregaties op resulterende gegevenssets. Deze worden ook uitgevoerd zoals UDF's in het Databricks-cluster.
De volgende Scala-toepassing gebruikt de map()
API om een getal in een resultaatkolom te wijzigen in een voorvoegseltekenreeks.
import com.databricks.connect.DatabricksSession
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, udf}
object Main {
def main(args: Array[String]): Unit = {
val sourceLocation = getClass.getProtectionDomain.getCodeSource.getLocation.toURI
val spark = DatabricksSession.builder()
.addCompiledArtifacts(sourceLocation)
.getOrCreate()
spark.range(3).map(f => s"row-$f").show()
}
}
Hoewel in dit voorbeeld de map()
API wordt gebruikt, is dit ook van toepassing op andere getypte gegevensset-API's, zoals filter()
mapPartitions()
, enzovoort.