Funkcje zdefiniowane przez użytkownika w usłudze Databricks Połączenie dla języka Scala
Uwaga
W tym artykule opisano usługę Databricks Połączenie dla środowiska Databricks Runtime w wersji 14.1 lub nowszej.
W tym artykule opisano sposób wykonywania funkcji zdefiniowanych przez użytkownika za pomocą usługi Databricks Połączenie dla języka Scala. Usługa Databricks Połączenie umożliwia łączenie popularnych środowisk IDE, serwerów notesów i aplikacji niestandardowych z klastrami usługi Azure Databricks. Aby zapoznać się z wersją języka Python tego artykułu, zobacz Funkcje zdefiniowane przez użytkownika w usłudze Databricks Połączenie dla języka Python.
Uwaga
Przed rozpoczęciem korzystania z usługi Databricks Połączenie należy skonfigurować klienta usługi Databricks Połączenie.
W przypadku środowiska Databricks Runtime 14.1 lub nowszego usługa Databricks Połączenie dla języka Scala obsługuje uruchamianie funkcji zdefiniowanych przez użytkownika (UDF).
Aby można było uruchomić funkcję zdefiniowanej przez użytkownika, skompilowana klasa i elementy JAR wymagane przez funkcję zdefiniowanej przez użytkownika muszą zostać przekazane do klastra.
Za addCompiledArtifacts()
pomocą interfejsu API można określić skompilowaną klasę i pliki JAR, które muszą zostać przekazane.
Uwaga
Język Scala używany przez klienta musi być zgodny z wersją języka Scala w klastrze usługi Azure Databricks. Aby sprawdzić wersję scala klastra, zobacz sekcję "Środowisko systemowe" dla wersji środowiska Databricks Runtime klastra w wersjach i zgodności środowiska Databricks Runtime.
Poniższy program Scala konfiguruje prostą funkcję zdefiniowaną przez użytkownika, która kwadratuje wartości w kolumnie.
import com.databricks.connect.DatabricksSession
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, udf}
object Main {
def main(args: Array[String]): Unit = {
val sourceLocation = getClass.getProtectionDomain.getCodeSource.getLocation.toURI
val spark = DatabricksSession.builder()
.addCompiledArtifacts(sourceLocation)
.getOrCreate()
def squared(x: Int): Int = x * x
val squared_udf = udf(squared _)
spark.range(3)
.withColumn("squared", squared_udf(col("id")))
.select("squared")
.show()
}
}
W poprzednim przykładzie, ponieważ funkcja zdefiniowana przez użytkownika jest w pełni zawarta w obiekcie Main
, dodawany jest tylko skompilowany artefakt Main
.
Jeśli funkcja UDF rozprzestrzenia się na inne klasy lub używa bibliotek zewnętrznych (tj. jednostek JAR), należy również uwzględnić wszystkie te biblioteki.
Po zainicjowaniu sesji platformy Spark można przekazać kolejne skompilowane klasy i elementy JAR przy użyciu interfejsu spark.addArtifact()
API.
Uwaga
Podczas przekazywania plików JAR wszystkie przechodnie zależności JAR muszą być dołączone do przekazywania. Interfejsy API nie wykonują żadnego automatycznego wykrywania zależności przechodnich.
Typizowane interfejsy API zestawu danych
Ten sam mechanizm opisany w poprzedniej sekcji funkcji zdefiniowanych przez użytkownika dotyczy również typowych interfejsów API zestawu danych.
Typizowane interfejsy API zestawu danych umożliwiają uruchamianie przekształceń, takich jak mapowanie, filtrowanie i agregacje w wynikowych zestawach danych. Są one również wykonywane podobnie jak funkcje zdefiniowane przez użytkownika w klastrze usługi Databricks.
Poniższa aplikacja Scala używa interfejsu map()
API do modyfikowania liczby w kolumnie wynikowej na prefiksowany ciąg.
import com.databricks.connect.DatabricksSession
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, udf}
object Main {
def main(args: Array[String]): Unit = {
val sourceLocation = getClass.getProtectionDomain.getCodeSource.getLocation.toURI
val spark = DatabricksSession.builder()
.addCompiledArtifacts(sourceLocation)
.getOrCreate()
spark.range(3).map(f => s"row-$f").show()
}
}
W tym przykładzie użyto interfejsu map()
API, ale dotyczy to również innych typowych interfejsów API zestawu danych, takich jak filter()
, mapPartitions()
itp.